You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/15 22:35:04 UTC

[3/3] S4-24 dynamic app deployment

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
new file mode 100644
index 0000000..279ccaa
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -0,0 +1,381 @@
+package org.apache.s4.deploy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Tests deployment of packaged applications <br>
+ * - loaded from local apps directory <br>
+ * - deployed through zookeeper notification <br>
+ * - ... from the file system <br>
+ * - ... or from a web server
+ * 
+ */
+public class TestAutomaticDeployment {
+
+    private Factory zookeeperServerConnectionFactory;
+    private Process forkedNode;
+    private ZkClient zkClient;
+    private String clusterName;
+    private HttpServer httpServer;
+    private static File tmpAppsDir;
+
+    @BeforeClass
+    public static void createS4RFiles() throws Exception {
+        tmpAppsDir = Files.createTempDir();
+
+        File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
+                + tmpAppsDir.getAbsolutePath() });
+
+        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/simple-deployable-app-2/build.gradle"), "installS4R", new String[] { "appsDir="
+                + tmpAppsDir.getAbsolutePath() });
+    }
+
+    @Before
+    public void cleanLocalAppsDir() throws ConfigurationException {
+        PropertiesConfiguration config = loadConfig();
+
+        if (!new File(config.getString("appsDir")).exists()) {
+            Assert.assertTrue(new File(config.getString("appsDir")).mkdirs());
+        } else {
+            if (!config.getString("appsDir").startsWith("/tmp")) {
+                Assert.fail("apps dir should a subdir of /tmp for safety");
+            }
+            CommTestUtils.deleteDirectoryContents(new File(config.getString("appsDir")));
+        }
+    }
+
+    private PropertiesConfiguration loadConfig() throws ConfigurationException {
+        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.load(is);
+        return config;
+    }
+
+    @Test
+    public void testInitialDeploymentFromFileSystem() throws Exception {
+
+        File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
+                + System.currentTimeMillis() + ".s4r");
+
+        Assert.assertTrue(ByteStreams.copy(
+                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+        initializeS4Node();
+
+        final String uri = s4rToDeploy.toURI().toString();
+
+        assertDeployment(uri, true);
+
+    }
+
+    @Test
+    public void testZkTriggeredDeploymentFromFileSystem() throws Exception {
+
+        initializeS4Node();
+
+        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+
+        File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+
+        Assert.assertTrue(ByteStreams.copy(
+                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+        final String uri = s4rToDeploy.toURI().toString();
+
+        assertDeployment(uri, false);
+
+    }
+
+    private void assertDeployment(final String uri, boolean initial) throws KeeperException, InterruptedException,
+            IOException {
+        CountDownLatch signalAppInitialized = new CountDownLatch(1);
+        CountDownLatch signalAppStarted = new CountDownLatch(1);
+        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppInitialized,
+                CommTestUtils.createZkClient());
+        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
+                CommTestUtils.createZkClient());
+
+        if (!initial) {
+            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+            zkClient.create("/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+        }
+
+        Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalAppStarted.await(10, TimeUnit.SECONDS));
+
+        String time1 = String.valueOf(System.currentTimeMillis());
+
+        CountDownLatch signalEvent1Processed = new CountDownLatch(1);
+        CommTestUtils
+                .watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
+
+        CoreTestUtils.injectIntoStringSocketAdapter(time1);
+
+        // check event processed
+        Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
+    }
+
+    private void assertMultipleAppsDeployment(String uri1, String uri2) throws KeeperException, InterruptedException,
+            IOException {
+        CountDownLatch signalApp1Initialized = new CountDownLatch(1);
+        CountDownLatch signalApp1Started = new CountDownLatch(1);
+
+        CountDownLatch signalApp2Initialized = new CountDownLatch(1);
+        CountDownLatch signalApp2Started = new CountDownLatch(1);
+
+        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalApp1Initialized,
+                CommTestUtils.createZkClient());
+        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp1Started,
+                CommTestUtils.createZkClient());
+
+        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp2Initialized,
+                CommTestUtils.createZkClient());
+        CommTestUtils.watchAndSignalCreation(AppConstants.STARTED_ZNODE_2, signalApp2Started,
+                CommTestUtils.createZkClient());
+
+        ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
+        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
+        zkClient.create("/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
+
+        ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
+        record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
+        zkClient.create("/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
+
+        Assert.assertTrue(signalApp1Initialized.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
+
+        Assert.assertTrue(signalApp2Initialized.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalApp2Started.await(10, TimeUnit.SECONDS));
+
+    }
+
+    @Test
+    public void testZkTriggeredDeploymentFromHttp() throws Exception {
+        initializeS4Node();
+
+        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+
+        File tmpDir = Files.createTempDir();
+
+        File s4rToDeploy = new File(tmpDir, String.valueOf(System.currentTimeMillis()));
+
+        Assert.assertTrue(ByteStreams.copy(
+                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+        // we start a
+        InetSocketAddress addr = new InetSocketAddress(8080);
+        httpServer = HttpServer.create(addr, 0);
+
+        httpServer.createContext("/s4", new MyHandler(tmpDir));
+        httpServer.setExecutor(Executors.newCachedThreadPool());
+        httpServer.start();
+
+        assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName(), false);
+
+    }
+
+    /**
+     * 
+     * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even when
+     * referenced through reflection, and even when referencing classes present in the classpath of the S4 node
+     * 
+     * Works in the following manner:
+     * 
+     * - we have app1 and app2, very simple apps
+     * 
+     * - app1 and app2 have 3 classes with same name: A, AppConstants and TestApp
+     * 
+     * - app1 in addition has a PE and a socket adapter so that it can react to injected events
+     * 
+     * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to the application
+     * index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the S4 node
+     * classpath, and therefore loaded by the standard classloader, not from an s4 app classloader)
+     * 
+     * - upon startup of the application, TestApp creates A by reflection, and A writes a znode specific to the current
+     * app
+     * 
+     * - app1 and app2 are generated through gradle scripts, called by executing the "gradlew" executable at the root of
+     * the project, and using the build.gradle file available for these applications
+     * 
+     * - app1 and app2 s4r archives are copied to a web server and published to Zookeeper
+     * 
+     * - they automatically get deployed, and we verify that 2 apps are correctly started, therefore that classes
+     * TestApp and A were independently loaded for independent applications
+     * 
+     */
+
+    @Test
+    public void testZkTriggeredDeploymentFromHttpForMultipleApps() throws Exception {
+        initializeS4Node();
+        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_2));
+
+        File tmpDir = Files.createTempDir();
+
+        File s4rToDeployForApp1 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app1");
+        File s4rToDeployForApp2 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app2");
+
+        Assert.assertTrue(ByteStreams.copy(
+                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")),
+                Files.newOutputStreamSupplier(s4rToDeployForApp1)) > 0);
+        Assert.assertTrue(ByteStreams.copy(
+                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                        + "/simple-deployable-app-2-0.0.0-SNAPSHOT.s4r")),
+                Files.newOutputStreamSupplier(s4rToDeployForApp2)) > 0);
+
+        // we start a
+        InetSocketAddress addr = new InetSocketAddress(8080);
+        httpServer = HttpServer.create(addr, 0);
+
+        httpServer.createContext("/s4", new MyHandler(tmpDir));
+        httpServer.setExecutor(Executors.newCachedThreadPool());
+        httpServer.start();
+
+        assertMultipleAppsDeployment("http://localhost:8080/s4/" + s4rToDeployForApp1.getName(),
+                "http://localhost:8080/s4/" + s4rToDeployForApp2.getName());
+
+    }
+
+    private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException {
+        // 0. package s4 app
+        // TODO this is currently done offline, and the app contains the TestApp class copied from the one in the
+        // current package .
+
+        // 1. start s4 nodes. Check that no app is deployed.
+        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.load(is);
+
+        clusterName = config.getString("cluster.name");
+        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 1);
+
+        zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+        Assert.assertTrue(processes.size() == 0);
+        final CountDownLatch signalProcessesReady = new CountDownLatch(1);
+
+        zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                if (currentChilds.size() == 2) {
+                    signalProcessesReady.countDown();
+                }
+
+            }
+        });
+
+        File tmpConfig = File.createTempFile("tmp", "config");
+        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+                Files.newOutputStreamSupplier(tmpConfig)) > 0);
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+
+        // TODO synchro with ready state from zk
+        Thread.sleep(10000);
+        // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+
+    }
+
+    @Before
+    public void prepare() throws Exception {
+        CommTestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+        final ZooKeeper zk = CommTestUtils.createZkClient();
+        try {
+            zk.delete("/simpleAppCreated", -1);
+        } catch (Exception ignored) {
+        }
+
+        zk.close();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        CommTestUtils.killS4App(forkedNode);
+        if (httpServer != null) {
+            httpServer.stop(0);
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        System.out.println("Server is listening on port 8080");
+    }
+}
+
+class MyHandler implements HttpHandler {
+
+    File tmpDir;
+
+    public MyHandler(File tmpDir) {
+        this.tmpDir = tmpDir;
+    }
+
+    @Override
+    public void handle(HttpExchange exchange) throws IOException {
+        String requestMethod = exchange.getRequestMethod();
+        if (requestMethod.equalsIgnoreCase("GET")) {
+            String fileName = exchange.getRequestURI().getPath().substring("/s4/".length());
+            Headers responseHeaders = exchange.getResponseHeaders();
+            responseHeaders.set(HttpHeaders.Names.CONTENT_TYPE, HttpHeaders.Values.BYTES);
+            exchange.sendResponseHeaders(200, Files.toByteArray(new File(tmpDir, fileName)).length);
+
+            OutputStream responseBody = exchange.getResponseBody();
+
+            ByteStreams.copy(new FileInputStream(new File(tmpDir, fileName)), responseBody);
+
+            responseBody.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
new file mode 100644
index 0000000..f5f3912
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
@@ -0,0 +1,62 @@
+package org.apache.s4.deploy;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromZK;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class TestModule extends AbstractModule {
+
+    private PropertiesConfiguration config;
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromZK.class);
+        bind(Topology.class).to(TopologyFromZK.class);
+        bind(Emitter.class).to(TCPEmitter.class);
+        bind(Listener.class).to(TCPListener.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 8dc2159..060f82a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -1,10 +1,19 @@
 package org.apache.s4.fixtures;
 
+import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
 
 import org.apache.s4.core.App;
 import org.apache.s4.core.Main;
-import org.apache.s4.fixtures.CommTestUtils;
+
+import com.google.common.io.PatternFilenameFilter;
 
 /**
  * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
@@ -21,4 +30,76 @@ public class CoreTestUtils extends CommTestUtils {
     public static Process forkS4Node() throws IOException, InterruptedException {
         return forkProcess(Main.class.getName(), new String[] {});
     }
+
+    public static Process forkS4Node(String[] args) throws IOException, InterruptedException {
+        return forkProcess(Main.class.getName(), args);
+    }
+
+    public static File findGradlewInRootDir() {
+        File gradlewFile = null;
+        if (new File(System.getProperty("user.dir")).listFiles(new PatternFilenameFilter("gradlew")).length == 1) {
+            gradlewFile = new File(System.getProperty("user.dir") + File.separator + "gradlew");
+        } else {
+            if (new File(System.getProperty("user.dir")).getParentFile().getParentFile()
+                    .listFiles(new PatternFilenameFilter("gradlew")).length == 1) {
+                gradlewFile = new File(new File(System.getProperty("user.dir")).getParentFile().getParentFile()
+                        .getAbsolutePath()
+                        + File.separator + "gradlew");
+            } else {
+                Assert.fail("Cannot find gradlew executable in [" + System.getProperty("user.dir") + "] or ["
+                        + new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() + "]");
+            }
+        }
+        return gradlewFile;
+    }
+
+    public static void callGradleTask(File gradlewFile, File buildFile, String taskName, String[] params)
+            throws Exception {
+    
+        List<String> cmdList = new ArrayList<String>();
+        cmdList.add(gradlewFile.getAbsolutePath());
+        cmdList.add("-c");
+        cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
+        cmdList.add("-b");
+        cmdList.add(buildFile.getAbsolutePath());
+        cmdList.add(taskName);
+        if (params.length > 0) {
+            for (int i = 0; i < params.length; i++) {
+                cmdList.add("-P" + params[i]);
+            }
+        }
+    
+        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+        ProcessBuilder pb = new ProcessBuilder(cmdList);
+    
+        pb.directory(buildFile.getParentFile());
+        pb.redirectErrorStream();
+        final Process process = pb.start();
+    
+        process.waitFor();
+    
+        // try {
+        // int exitValue = process.exitValue();
+        // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+        // } catch (IllegalThreadStateException ignored) {
+        // }
+    
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                String line;
+                try {
+                    line = br.readLine();
+                    while (line != null) {
+                        System.out.println(line);
+                        line = br.readLine();
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+    
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index c40a050..e12853d 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -1,6 +1,5 @@
 package org.apache.s4.wordcount;
 
-
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
@@ -18,9 +17,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-
 public class WordCountTest {
-    
+
     public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
     public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
     public static final String SENTENCE_2 = "doobie doobie da";
@@ -28,10 +26,9 @@ public class WordCountTest {
     public static final String SENTENCE_3 = "doobie";
     public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
     public static final String FLAG = ";";
-    public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS
-            + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
+    public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
     private static Factory zookeeperServerConnectionFactory;
-    
+
     @Before
     public void prepare() throws IOException, InterruptedException, KeeperException {
         CommTestUtils.cleanupTmpDirs();
@@ -45,10 +42,8 @@ public class WordCountTest {
      * 
      * 
      * 
-     *           sentences                      words                    word counts
-     * Adapter ------------> WordSplitterPE -----------> WordCounterPE -------------> WordClassifierPE 
-     *                       key = "sentence"             key = word                   key="classifier"
-     *                       (should be *)               
+     * sentences words word counts Adapter ------------> WordSplitterPE -----------> WordCounterPE ------------->
+     * WordClassifierPE key = "sentence" key = word key="classifier" (should be *)
      * 
      * 
      * The test consists in checking that words are correctly counted.
@@ -57,31 +52,25 @@ public class WordCountTest {
      */
     @Test
     public void testSimple() throws Exception {
-        
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        
-        App.main(new String[]{WordCountModule.class.getName(), WordCountApp.class.getName()});
-        
+
+        App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
-                zk);
-        
+        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
         // add authorizations for processing
-        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
-                + 1; i++) {
-            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL);
+        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
         CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
         CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
         CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
         signalTextProcessed.await();
-        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR
-                + File.separator + "wordcount");
+        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
         String s = CommTestUtils.readFile(results);
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-        
+
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
index fb1d249..6f900c3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -1,19 +1,23 @@
 package org.apache.s4.wordcount.zk;
 
-import static org.apache.s4.wordcount.WordCountTest.*;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1_TOTAL_WORDS;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2_TOTAL_WORDS;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_3;
+
 import java.io.File;
 import java.util.concurrent.CountDownLatch;
 
 import junit.framework.Assert;
 
-import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.s4.wordcount.WordCountApp;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.Test;
 
 public class WordCountTestZk extends ZkBasedTest {
@@ -22,7 +26,7 @@ public class WordCountTestZk extends ZkBasedTest {
 
         final ZooKeeper zk = CommTestUtils.createZkClient();
 
-        App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
+        Main.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 0e31dfa..62fc7d5 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -4,6 +4,6 @@ cluster.hosts = localhost
 cluster.ports = 5077
 cluster.lock_dir = {user.dir}/tmp
 cluster.name = s4-test-cluster
-cluster.zk_address = localhost:2181
+cluster.zk_address = localhost:21810
 cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
\ No newline at end of file
+cluster.zk_connection_timeout = 10000

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
new file mode 100644
index 0000000..d74927f
--- /dev/null
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -0,0 +1,11 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-test-cluster
+cluster.zk_address = localhost:21810
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.deploy.TestModule
+s4.logger_level = TRACE
+appsDir=/tmp/deploy-test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-example/s4-example.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/s4-example.gradle b/subprojects/s4-example/s4-example.gradle
index 664496e..bbeb11b 100644
--- a/subprojects/s4-example/s4-example.gradle
+++ b/subprojects/s4-example/s4-example.gradle
@@ -21,4 +21,5 @@ dependencies {
     compile project( ":s4-core" )
     compile project( ":s4-comm" )
 	compile libraries.ejml
+    compile libraries.junit
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/build.gradle b/test-apps/simple-deployable-app-1/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+           json:               'org.json:json:20090211',
+           guava:              'com.google.guava:guava:10.0.1',
+           gson:               'com.google.code.gson:gson:1.6',
+           guice:              'com.google.inject:guice:3.0',
+           guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
+           guice_grapher:      'com.google.inject:guice-grapher:3.0',
+           flexjson:           'net.sf.flexjson:flexjson:2.1',
+           bcel:               'org.apache.bcel:bcel:5.2',
+           jakarta_regexp:     'jakarta-regexp:jakarta-regexp:1.4',
+           kryo:               'com.googlecode:kryo:1.04',
+           netty:              'org.jboss.netty:netty:3.2.5.Final',
+           reflectasm:         'com.esotericsoftware:reflectasm:0.8',
+           minlog:             'com.esotericsoftware:minlog:1.2',
+           asm:                'asm:asm:3.2',
+           commons_io:         'commons-io:commons-io:2.0.1',
+           commons_config:     'commons-configuration:commons-configuration:1.6',
+           commons_codec:      'commons-codec:commons-codec:1.4',
+           commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+           commons_coll:       'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+           slf4j:              'org.slf4j:slf4j-api:1.6.1',
+           logback_core:       'ch.qos.logback:logback-core:0.9.29',
+           logback_classic:    'ch.qos.logback:logback-classic:0.9.29',
+           zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
+           jcip:               'net.jcip:jcip-annotations:1.0',
+           junit:              'junit:junit:4.10',
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile project(":s4-base")
+   compile project(":s4-comm")
+   compile project(":s4-core")
+   
+   /* Logging. */
+   compile( libraries.slf4j )
+   compile( libraries.logback_core )
+   compile( libraries.logback_classic )
+
+   /* Commons. */
+   compile( libraries.commons_io )
+   compile( libraries.commons_config )
+   compile( libraries.commons_coll )
+
+   /* Misc. */
+   compile( libraries.jcip )
+
+   /* Testing. */
+   testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+   
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+   
+   if (appClassname == "UNKNOWN") {
+       
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+   
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   lines= file.readLines()
+   for(line in lines) {
+       
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+       
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found app class name: " + classname
+           break
+       }
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java
new file mode 100644
index 0000000..45bc37b
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java
@@ -0,0 +1,17 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.deploy.AppConstants;
+
+public class A {
+
+    public A(ZkClient zkClient) {
+        try {
+            zkClient.createEphemeral(AppConstants.STARTED_ZNODE_1, null);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java
new file mode 100644
index 0000000..a12b6ea
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java
@@ -0,0 +1,10 @@
+package org.apache.s4.deploy;
+
+public class AppConstants {
+    public static final String STARTED_ZNODE_1 = "/s4-test/test-app-1-started";
+    public static final String INITIALIZED_ZNODE_1 = "/s4-test/test-app-1-initialized";
+
+    public static final String STARTED_ZNODE_2 = "/s4-test/test-app-2-started";
+    public static final String INITIALIZED_ZNODE_2 = "/s4-test/test-app-2-initialized";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
new file mode 100644
index 0000000..809cebd
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
@@ -0,0 +1,63 @@
+package org.apache.s4.deploy;
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.LoggerFactory;
+
+public class SimplePE extends ProcessingElement implements Watcher {
+
+    private ZooKeeper zk;
+
+    public SimplePE() {
+    }
+
+    public SimplePE(App app) {
+        super(app);
+    }
+
+    public void onEvent(org.apache.s4.base.Event event) {
+        try {
+            LoggerFactory.getLogger(getClass()).debug("processing envent {}", event.get("line"));
+            zk.create("/onEvent@" + event.get("line"), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.close();
+        } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void onCreate() {
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:" + 21810, 4000, this);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
new file mode 100644
index 0000000..5ee7197
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
@@ -0,0 +1,80 @@
+package org.apache.s4.deploy;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.Stream;
+
+public class SocketAdapter {
+
+    static ServerSocket serverSocket;
+
+    /**
+     * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
+     * connection
+     * 
+     * @param stream
+     * @throws IOException
+     */
+    public SocketAdapter(final Stream<Event> stream) throws IOException {
+        Thread t = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                serverSocket = null;
+                Socket connectedSocket;
+                BufferedReader in = null;
+                try {
+                    serverSocket = new ServerSocket(12000);
+                    while (true) {
+                        connectedSocket = serverSocket.accept();
+                        in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
+
+                        String line = in.readLine();
+                        System.out.println("read: " + line);
+                        Event event = new Event();
+                        event.put("line", String.class, line);
+                        stream.put(event);
+                        connectedSocket.close();
+                    }
+
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    System.exit(-1);
+                } finally {
+                    if (in != null) {
+                        try {
+                            in.close();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    if (serverSocket != null) {
+                        try {
+                            serverSocket.close();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+        });
+        t.start();
+
+    }
+
+    public void close() {
+        if (serverSocket != null) {
+            try {
+                serverSocket.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
new file mode 100644
index 0000000..25b98a6
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -0,0 +1,60 @@
+package org.apache.s4.deploy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.zookeeper.CreateMode;
+
+public class TestApp extends App {
+
+    private ZkClient zkClient;
+    private SocketAdapter socketAdapter;
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        try {
+            try {
+                SimplePE prototype = createPE(SimplePE.class);
+                Stream<Event> stream = createStream("stream", new KeyFinder<Event>() {
+                    public java.util.List<String> get(Event event) {
+                        return new ArrayList<String>() {
+                            {
+                                add("line");
+                            }
+                        };
+                    }
+                }, prototype);
+                socketAdapter = new SocketAdapter(stream);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            zkClient = new ZkClient("localhost:" + 21810);
+            if (!zkClient.exists("/s4-test")) {
+                zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
+            }
+            zkClient.createEphemeral(AppConstants.INITIALIZED_ZNODE_1, null);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+    }
+
+    @Override
+    protected void onStart() {
+        try {
+            Class.forName("org.apache.s4.deploy.A").getConstructor(ZkClient.class).newInstance(zkClient);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/build.gradle b/test-apps/simple-deployable-app-2/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+           json:               'org.json:json:20090211',
+           guava:              'com.google.guava:guava:10.0.1',
+           gson:               'com.google.code.gson:gson:1.6',
+           guice:              'com.google.inject:guice:3.0',
+           guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
+           guice_grapher:      'com.google.inject:guice-grapher:3.0',
+           flexjson:           'net.sf.flexjson:flexjson:2.1',
+           bcel:               'org.apache.bcel:bcel:5.2',
+           jakarta_regexp:     'jakarta-regexp:jakarta-regexp:1.4',
+           kryo:               'com.googlecode:kryo:1.04',
+           netty:              'org.jboss.netty:netty:3.2.5.Final',
+           reflectasm:         'com.esotericsoftware:reflectasm:0.8',
+           minlog:             'com.esotericsoftware:minlog:1.2',
+           asm:                'asm:asm:3.2',
+           commons_io:         'commons-io:commons-io:2.0.1',
+           commons_config:     'commons-configuration:commons-configuration:1.6',
+           commons_codec:      'commons-codec:commons-codec:1.4',
+           commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+           commons_coll:       'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+           slf4j:              'org.slf4j:slf4j-api:1.6.1',
+           logback_core:       'ch.qos.logback:logback-core:0.9.29',
+           logback_classic:    'ch.qos.logback:logback-classic:0.9.29',
+           zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
+           jcip:               'net.jcip:jcip-annotations:1.0',
+           junit:              'junit:junit:4.10',
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile project(":s4-base")
+   compile project(":s4-comm")
+   compile project(":s4-core")
+   
+   /* Logging. */
+   compile( libraries.slf4j )
+   compile( libraries.logback_core )
+   compile( libraries.logback_classic )
+
+   /* Commons. */
+   compile( libraries.commons_io )
+   compile( libraries.commons_config )
+   compile( libraries.commons_coll )
+
+   /* Misc. */
+   compile( libraries.jcip )
+
+   /* Testing. */
+   testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+   
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+   
+   if (appClassname == "UNKNOWN") {
+       
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+   
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   lines= file.readLines()
+   for(line in lines) {
+       
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+       
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found app class name: " + classname
+           break
+       }
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java
new file mode 100644
index 0000000..732be13
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java
@@ -0,0 +1,16 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.ZkClient;
+
+public class A {
+
+    public A(ZkClient zkClient) {
+        try {
+            zkClient.createEphemeral(AppConstants.STARTED_ZNODE_2, null);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java
new file mode 100644
index 0000000..a12b6ea
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java
@@ -0,0 +1,10 @@
+package org.apache.s4.deploy;
+
+public class AppConstants {
+    public static final String STARTED_ZNODE_1 = "/s4-test/test-app-1-started";
+    public static final String INITIALIZED_ZNODE_1 = "/s4-test/test-app-1-initialized";
+
+    public static final String STARTED_ZNODE_2 = "/s4-test/test-app-2-started";
+    public static final String INITIALIZED_ZNODE_2 = "/s4-test/test-app-2-initialized";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
new file mode 100644
index 0000000..0657ee9
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -0,0 +1,38 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.core.App;
+import org.apache.zookeeper.CreateMode;
+
+public class TestApp extends App {
+
+    private ZkClient zkClient;
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        try {
+            zkClient = new ZkClient("localhost:" + 21810);
+            if (!zkClient.exists("/s4-test")) {
+                zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
+            }
+            zkClient.createEphemeral(AppConstants.INITIALIZED_ZNODE_2, null);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+    }
+
+    @Override
+    protected void onStart() {
+        try {
+            Class.forName("org.apache.s4.deploy.A").getConstructor(ZkClient.class).newInstance(zkClient);
+        } catch (Exception e) {
+            System.exit(-1);
+        }
+    }
+}