You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/09 10:46:25 UTC

[2/3] git commit: Fixed s4r URI management + ensure only 1 app is loaded - added a "-generatedS4R/-g" to specify output location when generating S4R" - application reference in Zookeeper is now a single znode instead of a directory, because we only allow

Fixed s4r URI management + ensure only 1 app is loaded
- added a "-generatedS4R/-g" to specify output location when generating S4R"
- application reference in Zookeeper is now a single znode instead of a directory,
because we only allow 1 app per cluster
- improved, refactored and renamed producer-consumer regressions tests: we now deploy
the producer and consumer apps in separate clusters, and properly perform a communication
test
- cluster resolution from the comm layer is now performed upon S4 node initialization,
so that it does not need to be triggered by sending a message


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/f6f91749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/f6f91749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/f6f91749

Branch: refs/heads/S4-71
Commit: f6f91749037dbe88dd2ce631166723875af7e475
Parents: 3da0170
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Jul 9 12:34:03 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Jul 9 12:34:03 2012 +0200

----------------------------------------------------------------------
 settings.gradle                                    |    4 +-
 .../java/org/apache/s4/comm/DefaultCommModule.java |    5 +-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    6 +
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |    1 +
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |    2 +-
 .../apache/s4/comm/topology/AssignmentFromZK.java  |    6 +
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |   15 +-
 .../s4/comm/topology/AssignmentsFromZKTest.java    |    3 +-
 .../s4/comm/topology/ClustersFromZKTest.java       |    1 +
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |   10 +-
 .../org/apache/s4/comm/util/ProtocolTestUtil.java  |    4 +-
 .../java/org/apache/s4/fixtures/CommTestUtils.java |  117 +-------
 .../src/main/java/org/apache/s4/core/App.java      |   14 -
 .../main/java/org/apache/s4/core/EventSource.java  |  100 -------
 .../src/main/java/org/apache/s4/core/Sender.java   |    2 +-
 .../src/main/java/org/apache/s4/core/Server.java   |   54 +---
 .../s4/deploy/DistributedDeploymentManager.java    |  116 +++-----
 .../apache/s4/deploy/TestAutomaticDeployment.java  |  128 +--------
 .../s4/deploy/prodcon/TestProducerConsumer.java    |   77 ++++--
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   10 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |   66 +++--
 test-apps/consumer-app/build.gradle                |  222 ++++++++++++++
 .../src/main/java/s4app/ConsumerApp.java           |   29 ++
 .../src/main/java/s4app/ConsumerPE.java            |   44 +++
 test-apps/producer-app/build.gradle                |  224 +++++++++++++++
 .../src/main/java/s4app/ProducerApp.java           |   32 ++
 .../src/main/java/s4app/ProducerPE.java            |   51 ++++
 test-apps/s4-counter/build.gradle                  |  224 ---------------
 .../s4-counter/src/main/java/s4app/ClockApp.java   |   38 ---
 .../s4-counter/src/main/java/s4app/ClockPE.java    |   49 ----
 test-apps/s4-showtime/build.gradle                 |  222 --------------
 .../s4-showtime/src/main/java/s4app/ShowPE.java    |   33 ---
 .../src/main/java/s4app/ShowTimeApp.java           |   29 --
 33 files changed, 815 insertions(+), 1123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index ec6cb7d..588d254 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -23,8 +23,8 @@ include 's4-tools'
 //include ':test-apps:simple-adapter-1'
 include ':test-apps:simple-deployable-app-1'
 include ':test-apps:simple-deployable-app-2'
-include ':test-apps:s4-showtime'
-include ':test-apps:s4-counter'
+include ':test-apps:producer-app'
+include ':test-apps:consumer-app'
 
 rootProject.name = 's4'
 rootProject.children.each {project ->

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index df38071..863bc29 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -71,10 +71,13 @@ public class DefaultCommModule extends AbstractModule {
         /* Use Kryo to serialize events. */
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
+        // a node holds a single partition assignment
+        // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
         bind(Assignment.class).to(AssignmentFromZK.class);
-        bind(Clusters.class).to(ClustersFromZK.class);
         bind(Cluster.class).to(ClusterFromZK.class);
 
+        bind(Clusters.class).to(ClustersFromZK.class);
+
         try {
             Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
                     .getString("comm.emitter.class"));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 5caafc9..5c83b09 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -157,6 +157,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         bootstrap.setOption("keepAlive", true);
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+
+        refreshCluster();
     }
 
     private class Message implements ChannelFutureListener {
@@ -455,6 +457,10 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     @Override
     public void onChange() {
+        refreshCluster();
+    }
+
+    private void refreshCluster() {
         for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
             Integer partition = clusterNode.getPartition();
             if (partition == null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 2aa14f0..3e905e6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -84,6 +84,7 @@ public class TCPListener implements Listener {
         }
     }
 
+    @Override
     public int getPartitionId() {
         return node.getPartition();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index aadcd4d..010274a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -36,7 +36,7 @@ public class TaskSetup {
 
         zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
         zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
-        zkclient.createPersistent("/s4/clusters/" + cluster + "/apps", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
         for (int i = 0; i < tasks; i++) {
             String taskId = "Task-" + i;
             ZNRecord record = new ZNRecord(taskId);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index d172d73..3a198bb 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -21,8 +21,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
+@Singleton
 public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
     private static final Logger logger = LoggerFactory.getLogger(AssignmentFromZK.class);
     /**
@@ -89,6 +91,10 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         ZkSerializer serializer = new ZNRecordSerializer();
         zkClient.setZkSerializer(serializer);
+    }
+
+    @Inject
+    void init() throws Exception {
         zkClient.subscribeStateChanges(this);
         if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
             throw new Exception("cannot connect to zookeeper");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index a65fca4..be020cb 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -20,22 +20,23 @@ public abstract class TCPCommTest extends ProtocolTestUtil {
     private static Logger logger = LoggerFactory.getLogger(TCPCommTest.class);
     DeliveryTestUtil util;
     public final static String CLUSTER_NAME = "cluster1";
-    Injector injector;
 
     public TCPCommTest() throws IOException {
         super();
-        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                .openStream(), CLUSTER_NAME), new TCPCommTestModule());
     }
 
     public TCPCommTest(int numTasks) throws IOException {
         super(numTasks);
-        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                .openStream(), CLUSTER_NAME), new TCPCommTestModule());
     }
 
-    public Injector getInjector() {
-        return injector;
+    public Injector newInjector() {
+        try {
+            return Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
+                    .openStream(), CLUSTER_NAME), new TCPCommTestModule());
+        } catch (IOException e) {
+            Assert.fail();
+            return null;
+        }
     }
 
     class TCPCommTestModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
index 71a3489..673c7a3 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
@@ -8,7 +8,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.fixtures.CommTestUtils;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Splitter;
@@ -17,7 +16,6 @@ import com.google.common.collect.Sets;
 public class AssignmentsFromZKTest extends ZKBaseTest {
 
     @Test
-    @Ignore
     public void testAssignmentFor1Cluster() throws Exception {
         TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
         final String topologyNames = "cluster1";
@@ -51,6 +49,7 @@ public class AssignmentsFromZKTest extends ZKBaseTest {
 
                         for (String topologyName : names) {
                             assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            assignmentFromZK.init();
                             ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
                             latch.countDown();
                         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index 38d7621..fb81228 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -69,6 +69,7 @@ public class ClustersFromZKTest extends ZKBaseTest {
                     try {
                         for (String clusterName : clusterNames) {
                             assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            assignmentFromZK.init();
                             ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
                             latch.countDown();
                         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 90ebc3f..c0ed6b4 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -15,23 +15,19 @@ import com.google.inject.name.Names;
 
 public abstract class UDPCommTest extends ProtocolTestUtil {
     DeliveryTestUtil util;
-    private Injector injector;
 
     public UDPCommTest() throws IOException {
         super();
-        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
-                .openStream(), "cluster1"), new UDPCommTestModule());
     }
 
     public UDPCommTest(int numTasks) throws IOException {
         super(numTasks);
-        injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
-                .openStream(), "cluster1"), new UDPCommTestModule());
     }
 
     @Override
-    protected Injector getInjector() throws IOException {
-        return injector;
+    protected Injector newInjector() throws IOException {
+        return Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(),
+                "cluster1"), new UDPCommTestModule());
     }
 
     class UDPCommTestModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
index 421aacd..6bc074e 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
@@ -27,12 +27,12 @@ public abstract class ProtocolTestUtil extends ZkBasedTest {
         expectedMessages = new int[super.numTasks];
         partitions = new PartitionInfo[super.numTasks];
         for (int i = 0; i < this.numTasks; i++) {
-            partitions[i] = getInjector().getInstance(PartitionInfo.class);
+            partitions[i] = newInjector().getInstance(PartitionInfo.class);
             partitions[i].setProtocolTestUtil(this);
         }
     }
 
-    protected abstract Injector getInjector() throws IOException;
+    protected abstract Injector newInjector() throws IOException;
 
     protected void decreaseExpectedMessages(int partition, long amount) {
         synchronized (expectedMessages) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index f7cf147..2791da6 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -2,9 +2,6 @@ package org.apache.s4.fixtures;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
@@ -12,6 +9,7 @@ import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,6 +27,8 @@ import org.apache.zookeeper.server.ZooKeeperServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.io.Files;
+
 /**
  * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
  * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
@@ -50,14 +50,17 @@ public class CommTestUtils {
         logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
     }
 
-    protected static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
+    protected static Process forkProcess(String mainClass, int debugPort, String... args) throws IOException,
+            InterruptedException {
         List<String> cmdList = new ArrayList<String>();
         cmdList.add("java");
         cmdList.add("-cp");
         cmdList.add(System.getProperty("java.class.path"));
-        // cmdList.add("-Xdebug");
-        // cmdList.add("-Xnoagent");
-        // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+        if (debugPort != -1) {
+            cmdList.add("-Xdebug");
+            cmdList.add("-Xnoagent");
+            cmdList.add("-Xrunjdwp:transport=dt_socket,address=" + debugPort + ",server=y,suspend=n");
+        }
 
         cmdList.add(mainClass);
         for (String arg : args) {
@@ -106,56 +109,11 @@ public class CommTestUtils {
     }
 
     public static void writeStringToFile(String s, File f) throws IOException {
-        if (f.exists()) {
-            if (!f.delete()) {
-                throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
-            }
-        }
-
-        FileWriter fw = null;
-        try {
-            if (!f.createNewFile()) {
-                throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
-            }
-            fw = new FileWriter(f);
-
-            fw.write(s);
-        } catch (IOException e) {
-            throw (e);
-        } finally {
-            if (fw != null) {
-                try {
-                    fw.close();
-                } catch (IOException e) {
-                    throw (e);
-                }
-            }
-        }
+        Files.write(s, f, Charset.defaultCharset());
     }
 
     public static String readFile(File f) throws IOException {
-        BufferedReader br = null;
-        try {
-            br = new BufferedReader(new FileReader(f));
-            StringBuilder sb = new StringBuilder();
-            String line = br.readLine();
-            while (line != null) {
-                sb.append(line);
-                line = br.readLine();
-                if (line != null) {
-                    sb.append("\n");
-                }
-            }
-            return sb.toString();
-        } finally {
-            if (br != null) {
-                try {
-                    br.close();
-                } catch (IOException e) {
-                    throw (e);
-                }
-            }
-        }
+        return Files.toString(f, Charset.defaultCharset());
 
     }
 
@@ -198,59 +156,12 @@ public class CommTestUtils {
     }
 
     public static String readFileAsString(File f) throws IOException {
-        FileReader fr = new FileReader(f);
-        StringBuilder sb = new StringBuilder("");
-        BufferedReader br = new BufferedReader(fr);
-        String line = br.readLine();
-        while (line != null) {
-            sb.append(line);
-            line = br.readLine();
-            if (line != null) {
-                sb.append("\n");
-            }
-        }
-        return sb.toString();
+        return Files.toString(f, Charset.defaultCharset());
 
     }
 
-    // TODO factor this code (see BasicFSStateStorage) - or use commons io or
-    // guava
     public static byte[] readFileAsByteArray(File file) throws Exception {
-        FileInputStream in = null;
-        try {
-            in = new FileInputStream(file);
-
-            long length = file.length();
-
-            /*
-             * Arrays can only be created using int types, so ensure that the file size is not too big before we
-             * downcast to create the array.
-             */
-            if (length > Integer.MAX_VALUE) {
-                throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
-            }
-
-            byte[] buffer = new byte[(int) length];
-            int offSet = 0;
-            int numRead = 0;
-
-            while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
-                offSet += numRead;
-            }
-
-            if (offSet < buffer.length) {
-                throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
-                        + buffer.length + " bytes read");
-            }
-
-            in.close();
-            return buffer;
-
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-        }
+        return Files.toByteArray(file);
     }
 
     public static ZooKeeper createZkClient() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index fe4ceed..eae1dda 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -49,9 +49,6 @@ public abstract class App {
     /* All the internal streams in this app. */
     final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
 
-    /* All the the event sources exported by this app. */
-    final private List<EventSource> eventSources = new ArrayList<EventSource>();
-
     /* Pes indexed by name. */
     Map<String, ProcessingElement> peByName = Maps.newHashMap();
 
@@ -125,11 +122,6 @@ public abstract class App {
         streams.add(stream);
     }
 
-    /* Should only be used within the core package. */
-    void addEventSource(EventSource es) {
-        eventSources.add(es);
-    }
-
     /* Returns list of PE prototypes. Should only be used within the core package. */
     List<ProcessingElement> getPePrototypes() {
         return pePrototypes;
@@ -147,12 +139,6 @@ public abstract class App {
         return streams;
     }
 
-    /* Returns list of the event sources to be exported. Should only be used within the core package. */
-    // TODO visibility
-    public List<EventSource> getEventSources() {
-        return eventSources;
-    }
-
     protected abstract void onStart();
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
deleted file mode 100644
index 1f89d86..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.s4.core;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.s4.base.Event;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * 
- * A producer app uses one or more EventSource classes to provide events to streamables. At runtime, consumer apps
- * subscribe to an event source by providing a streamable object. Each EventSource instance may correspond to a
- * different type of event stream. Each EventSource may have an unlimited number of subscribers.
- * 
- */
-public class EventSource implements Streamable {
-
-    /* No need to synchronize this object because we expect a single thread. */
-    private Set<Streamable> streamables = new HashSet<Streamable>();
-    private static final Logger logger = LoggerFactory.getLogger(EventSource.class);
-    final private String name;
-
-    public EventSource(App app, String name) {
-        this.name = name;
-        app.addEventSource(this);
-    }
-
-    /**
-     * Subscribe a streamable to this event source.
-     * 
-     * @param aStream
-     */
-    public void subscribeStream(Streamable aStream) {
-        logger.info("Subscribing stream: {} to event source: {}.", aStream.getName(), getName());
-        streamables.add(aStream);
-    }
-
-    /**
-     * Unsubscribe a streamable from this event source.
-     * 
-     * @param stream
-     */
-    public void unsubscribeStream(Streamable stream) {
-        logger.info("Unsubsubscribing stream: {} to event source: {}.", stream.getName(), getName());
-        streamables.remove(stream);
-    }
-
-    /**
-     * Send an event to all the subscribed streamables.
-     * 
-     * @param event
-     */
-    @Override
-    public void put(Event event) {
-        for (Streamable<Event> stream : streamables) {
-            stream.put(event);
-        }
-    }
-
-    /**
-     * 
-     * @return the number of streamables subscribed to this event source.
-     */
-    public int getNumSubscribers() {
-        return streamables.size();
-    }
-
-    /**
-     * @return the name of this event source.
-     */
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * Close all the streamables subscribed to this event source.
-     */
-    @Override
-    public void close() {
-        for (Streamable stream : streamables) {
-            logger.info("Closing stream: {} in event source: {}.", stream.getName(), getName());
-            stream.close();
-        }
-    }
-
-    /**
-     * 
-     * @return the set of streamables subscribed to this event source.
-     */
-    public Set<Streamable> getStreamables() {
-        return streamables;
-    }
-
-    @Override
-    public void start() {
-        // TODO Auto-generated method stub
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 1104716..10da9a2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -29,7 +29,7 @@ public class Sender {
     /**
      * 
      * @param emitter
-     *            the emitter implements the low level commiunication layer.
+     *            the emitter implements the low level communication layer.
      * @param serDeser
      *            a serialization mechanism.
      * @param hasher

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 3a87648..44e7b56 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -2,13 +2,13 @@ package org.apache.s4.core;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.jar.Attributes.Name;
 import java.util.jar.JarFile;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.base.util.S4RLoader;
+import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.deploy.DeploymentManager;
 import org.slf4j.Logger;
@@ -16,8 +16,6 @@ import org.slf4j.LoggerFactory;
 
 import ch.qos.logback.classic.Level;
 
-import com.google.common.collect.Maps;
-import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.name.Named;
@@ -30,14 +28,8 @@ public class Server {
 
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
 
-    private final String commModuleName;
     private final String logLevel;
     public static final String MANIFEST_S4_APP_CLASS = "S4-App-Class";
-    // local applications directory
-    private final String appsDir;
-    Map<String, App> apps = Maps.newHashMap();
-    Map<String, Streamable> streams = Maps.newHashMap();
-    Map<String, EventSource> eventSources = Maps.newHashMap();
     CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
 
     private Injector injector;
@@ -45,7 +37,8 @@ public class Server {
     @Inject
     private DeploymentManager deploymentManager;
 
-    private String clusterName;
+    @Inject
+    private AssignmentFromZK assignment;
 
     private ZkClient zkClient;
 
@@ -53,15 +46,11 @@ public class Server {
      *
      */
     @Inject
-    public Server(String commModuleName, @Named("s4.logger_level") String logLevel, @Named("appsDir") String appsDir,
+    public Server(String commModuleName, @Named("s4.logger_level") String logLevel,
             @Named("cluster.name") String clusterName, @Named("cluster.zk_address") String zookeeperAddress,
             @Named("cluster.zk_session_timeout") int sessionTimeout,
             @Named("cluster.zk_connection_timeout") int connectionTimeout) {
-        // TODO do we need to separate the comm module?
-        this.commModuleName = commModuleName;
         this.logLevel = logLevel;
-        this.appsDir = appsDir;
-        this.clusterName = clusterName;
 
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -75,49 +64,19 @@ public class Server {
                 .getLogger(Logger.ROOT_LOGGER_NAME);
         root.setLevel(Level.toLevel(logLevel));
 
-        AbstractModule module = null;
-
-        /* Initialize communication layer module. */
-        // TODO do we need a separate comm layer?
-        // try {
-        // module = (AbstractModule) Class.forName(commModuleName).newInstance();
-        // } catch (Exception e) {
-        // logger.error("Unable to instantiate communication layer module.", e);
-        // }
-        //
-        // /* After some indirection we get the injector. */
-        // injector = Guice.createInjector(module);
-
-        if (!new File(appsDir).exists()) {
-            if (!new File(appsDir).mkdirs()) {
-                logger.error("Cannot create apps directory [{}]", appsDir);
-            }
-        }
-
-        // disabled app loading from local files
-
         if (deploymentManager != null) {
             deploymentManager.start();
         }
 
-        // wait for at least 1 app to be loaded (otherwise the server would not have anything to do and just die)
+        // wait for an app to be loaded (otherwise the server would not have anything to do and just die)
         signalOneAppLoaded.await();
 
     }
 
-    public String getS4RDir() {
-        return appsDir;
-    }
-
-    public App loadApp(File s4r) {
-        logger.info("Local app deployment: using s4r file name [{}] as application name",
-                s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
-        return loadApp(s4r, s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
-    }
-
     public App loadApp(File s4r, String appName) {
 
         // TODO handle application upgrade
+        logger.info("Loading application [{}] from file [{}]", appName, s4r.getAbsolutePath());
 
         S4RLoader cl = new S4RLoader(s4r.getAbsolutePath());
         try {
@@ -144,7 +103,6 @@ public class Server {
                 return null;
             }
 
-            App previous = apps.put(appName, app);
             logger.info("Loaded application from file {}", s4r.getAbsolutePath());
             signalOneAppLoaded.countDown();
             return app;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index a316005..979cc5d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -5,13 +5,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
-import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -21,8 +16,6 @@ import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import com.google.inject.Inject;
@@ -31,7 +24,7 @@ import com.google.inject.name.Named;
 /**
  * 
  * <p>
- * Monitors applications on a given s4 cluster and starts them.
+ * Monitors application availability on a given s4 cluster. Starts the application when available.
  * </p>
  * 
  * <p>
@@ -62,11 +55,10 @@ public class DistributedDeploymentManager implements DeploymentManager {
 
     private final String clusterName;
 
-    final Set<String> apps = Sets.newHashSet();
     private final ZkClient zkClient;
-    private final String appsPath;
+    private final String appPath;
     private final Server server;
-    CountDownLatch signalInitialAppsLoaded = new CountDownLatch(1);
+    boolean deployed = false;
 
     @Inject
     public DistributedDeploymentManager(@Named("cluster.name") String clusterName,
@@ -79,58 +71,58 @@ public class DistributedDeploymentManager implements DeploymentManager {
 
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        IZkChildListener appListener = new AppsChangeListener();
-        appsPath = "/s4/clusters/" + clusterName + "/apps";
-        if (!zkClient.exists(appsPath)) {
-            zkClient.create(appsPath, null, CreateMode.PERSISTENT);
+        String appDir = "/s4/clusters/" + clusterName + "/app";
+        if (!zkClient.exists(appDir)) {
+            zkClient.create(appDir, null, CreateMode.PERSISTENT);
         }
-        zkClient.subscribeChildChanges(appsPath, appListener);
+        appPath = appDir + "/s4App";
+        zkClient.subscribeDataChanges(appPath, new AppChangeListener());
     }
 
-    public void deployApplication(String newApp) throws DeploymentFailedException {
-        ZNRecord appData = zkClient.readData(appsPath + "/" + newApp);
+    public void deployApplication() throws DeploymentFailedException {
+        ZNRecord appData = zkClient.readData(appPath);
         String uriString = appData.getSimpleField(S4R_URI);
+        String appName = appData.getSimpleField("name");
         try {
             URI uri = new URI(uriString);
 
             // fetch application
-            final File s4rFile = new File(server.getS4RDir() + File.separator + clusterName + File.separator + newApp
-                    + ".s4r");
-            if (s4rFile.exists()) {
-                s4rFile.delete();
-            }
+            File localS4RFileCopy;
             try {
-                Files.createParentDirs(s4rFile);
-                s4rFile.createNewFile();
+                localS4RFileCopy = File.createTempFile("tmp", "s4r");
             } catch (IOException e1) {
-                throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
-                        + uri.toString() + "] ", e1);
+                logger.error(
+                        "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+                        appName, e1.getClass().getName() + "->" + e1.getMessage());
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
             }
+            localS4RFileCopy.deleteOnExit();
             try {
-                if (ByteStreams.copy(fetchS4App(uri), Files.newOutputStreamSupplier(s4rFile)) == 0) {
+                if (ByteStreams.copy(fetchS4App(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
                     throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
-                            + s4rFile.getAbsolutePath() + "] (nothing was copied)");
+                            + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
                 }
             } catch (IOException e) {
-                throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
                         + uri.toString() + "] ", e);
             }
             // install locally
-            App loaded = server.loadApp(s4rFile);
+            App loaded = server.loadApp(localS4RFileCopy, appName);
             if (loaded != null) {
-                logger.info("Successfully installed application {}", newApp);
-                server.startApp(loaded, newApp, clusterName);
+                logger.info("Successfully installed application {}", appName);
+                server.startApp(loaded, appName, clusterName);
             } else {
-                throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
                         + uri.toString() + "] : cannot start application");
             }
             // TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
 
         } catch (URISyntaxException e) {
-            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] { newApp,
-                    uriString, e.getMessage() });
-            throw new DeploymentFailedException("Cannot deploy application [" + newApp + "]", e);
+            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
+                    appName, uriString, e.getMessage() });
+            throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
         }
+        deployed = true;
     }
 
     // NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
@@ -146,36 +138,19 @@ public class DistributedDeploymentManager implements DeploymentManager {
         throw new DeploymentFailedException("Unsupported protocol " + scheme);
     }
 
-    // synchronizes with startup apps loading
-    private void deployNewApps(Set<String> newApps) {
-        try {
-            signalInitialAppsLoaded.await();
-        } catch (InterruptedException e1) {
-            logger.error("Interrupted while waiting for initialization of initial application. Cancelling deployment of new applications.");
-            Thread.currentThread().interrupt();
-            return;
-        }
-        deployApps(newApps);
-    }
-
-    private void deployApps(Set<String> newApps) {
-        for (String newApp : newApps) {
-            try {
-                deployApplication(newApp);
-                apps.add(newApp);
-            } catch (DeploymentFailedException e) {
-                logger.error("Application deployment failed for {}", newApp);
-            }
+    private final class AppChangeListener implements IZkDataListener {
+        @Override
+        public void handleDataDeleted(String dataPath) throws Exception {
+            logger.error("Application undeployment is not supported yet");
         }
-    }
 
-    private final class AppsChangeListener implements IZkChildListener {
         @Override
-        public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
-            SetView<String> newApps = Sets.difference(Sets.newHashSet(currentChildren), apps);
-            logger.info("Detected new application(s) to deploy {}" + Arrays.toString(newApps.toArray(new String[] {})));
-
-            deployNewApps(newApps);
+        public void handleDataChange(String dataPath, Object data) throws Exception {
+            if (!deployed) {
+                deployApplication();
+            } else {
+                logger.error("There is already an application deployed on this S4 node");
+            }
 
         }
 
@@ -183,9 +158,12 @@ public class DistributedDeploymentManager implements DeploymentManager {
 
     @Override
     public void start() {
-        List<String> initialApps = zkClient.getChildren(appsPath);
-        deployApps(new HashSet<String>(initialApps));
-        signalInitialAppsLoaded.countDown();
+        if (zkClient.exists(appPath)) {
+            try {
+                deployApplication();
+            } catch (DeploymentFailedException e) {
+                logger.error("Cannot deploy application", e);
+            }
+        }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 8601d91..495c1ad 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -28,7 +28,6 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.io.ByteStreams;
@@ -65,29 +64,6 @@ public class TestAutomaticDeployment {
                 + "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
                 + tmpAppsDir.getAbsolutePath() });
 
-        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
-                + "/test-apps/simple-deployable-app-2/build.gradle"), "installS4R", new String[] { "appsDir="
-                + tmpAppsDir.getAbsolutePath() });
-    }
-
-    // ignore this test since now we only deploy from artifacts published through zookeeper
-    @Test
-    @Ignore
-    public void testInitialDeploymentFromFileSystem() throws Exception {
-
-        // File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
-        // + System.currentTimeMillis() + ".s4r");
-        //
-        // Assert.assertTrue(ByteStreams.copy(
-        // Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
-        // + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
-        //
-        // initializeS4Node();
-        //
-        // final String uri = s4rToDeploy.toURI().toString();
-        //
-        // assertDeployment(uri, true);
-
     }
 
     @Test
@@ -105,12 +81,11 @@ public class TestAutomaticDeployment {
 
         final String uri = s4rToDeploy.toURI().toString();
 
-        assertDeployment(uri, false);
+        assertDeployment(uri);
 
     }
 
-    private void assertDeployment(final String uri, boolean initial) throws KeeperException, InterruptedException,
-            IOException {
+    private void assertDeployment(final String uri) throws KeeperException, InterruptedException, IOException {
         CountDownLatch signalAppInitialized = new CountDownLatch(1);
         CountDownLatch signalAppStarted = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppInitialized,
@@ -118,11 +93,9 @@ public class TestAutomaticDeployment {
         CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
                 CommTestUtils.createZkClient());
 
-        if (!initial) {
-            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp", record, CreateMode.PERSISTENT);
-        }
+        ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+        record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/app/s4App", record, CreateMode.PERSISTENT);
 
         Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
         Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
@@ -139,40 +112,6 @@ public class TestAutomaticDeployment {
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
     }
 
-    private void assertMultipleAppsDeployment(String uri1, String uri2) throws KeeperException, InterruptedException,
-            IOException {
-        CountDownLatch signalApp1Initialized = new CountDownLatch(1);
-        CountDownLatch signalApp1Started = new CountDownLatch(1);
-
-        CountDownLatch signalApp2Initialized = new CountDownLatch(1);
-        CountDownLatch signalApp2Started = new CountDownLatch(1);
-
-        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalApp1Initialized,
-                CommTestUtils.createZkClient());
-        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp1Started,
-                CommTestUtils.createZkClient());
-
-        CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp2Initialized,
-                CommTestUtils.createZkClient());
-        CommTestUtils.watchAndSignalCreation(AppConstants.STARTED_ZNODE_2, signalApp2Started,
-                CommTestUtils.createZkClient());
-
-        ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
-        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
-        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp1", record1, CreateMode.PERSISTENT);
-
-        ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
-        record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
-        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp2", record2, CreateMode.PERSISTENT);
-
-        Assert.assertTrue(signalApp1Initialized.await(20, TimeUnit.SECONDS));
-        Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
-
-        Assert.assertTrue(signalApp2Initialized.await(20, TimeUnit.SECONDS));
-        Assert.assertTrue(signalApp2Started.await(10, TimeUnit.SECONDS));
-
-    }
-
     @Test
     public void testZkTriggeredDeploymentFromHttp() throws Exception {
         initializeS4Node();
@@ -195,62 +134,7 @@ public class TestAutomaticDeployment {
         httpServer.setExecutor(Executors.newCachedThreadPool());
         httpServer.start();
 
-        assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName(), false);
-
-    }
-
-    /**
-     * * * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even
-     * when referenced through reflection, and even when referencing classes present in the classpath of the S4 nod * *
-     * Works in the following manne * * - we have app1 and app2, very simple a * * - app1 and app2 have 3 classes with
-     * same name: A, AppConstants and Tes * * - app1 in addition has a PE and a socket adapter so that it can react to
-     * injected e * * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to
-     * the application index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the
-     * S4 node classpath, and therefore loaded by the standard classloader, not from an s4 app classl *
-     * 
-     * - upon startup of the application, TestApp creates A by reflection, and A writes a znode specific to the current
-     * p
-     * 
-     * - app1 and app2 are generated through gradle scripts, called by executing the "gradlew" executable at the root of
-     * the project, and using the build.gradle file available for these appl * ns
-     * 
-     * - app1 and app2 s4r archives are copied to a web server and published to * per
-     * 
-     * - they automatically get deployed, and we verify that 2 apps are correctly started, therefore that classes
-     * TestApp and A were independently loaded for independent ap * ions
-     * 
-     */
-
-    @Test
-    public void testZkTriggeredDeploymentFromHttpForMultipleApps() throws Exception {
-        initializeS4Node();
-        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
-        Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_2));
-
-        File tmpDir = Files.createTempDir();
-
-        File s4rToDeployForApp1 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app1");
-        File s4rToDeployForApp2 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app2");
-
-        Assert.assertTrue(ByteStreams.copy(
-                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
-                        + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")),
-                Files.newOutputStreamSupplier(s4rToDeployForApp1)) > 0);
-        Assert.assertTrue(ByteStreams.copy(
-                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
-                        + "/simple-deployable-app-2-0.0.0-SNAPSHOT.s4r")),
-                Files.newOutputStreamSupplier(s4rToDeployForApp2)) > 0);
-
-        // we start a
-        InetSocketAddress addr = new InetSocketAddress(8080);
-        httpServer = HttpServer.create(addr, 0);
-
-        httpServer.createContext("/s4", new MyHandler(tmpDir));
-        httpServer.setExecutor(Executors.newCachedThreadPool());
-        httpServer.start();
-
-        assertMultipleAppsDeployment("http://localhost:8080/s4/" + s4rToDeployForApp1.getName(),
-                "http://localhost:8080/s4/" + s4rToDeployForApp2.getName());
+        assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName());
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 5062660..52792b1 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -4,6 +4,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
@@ -18,6 +19,7 @@ import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
@@ -33,9 +35,11 @@ import com.sun.net.httpserver.HttpServer;
 public class TestProducerConsumer {
 
     private Factory zookeeperServerConnectionFactory;
-    private Process forkedNode;
+    private Process forkedProducerNode;
+    private Process forkedConsumerNode;
     private ZkClient zkClient;
-    private final static String CLUSTER_NAME = "prodconcluster";
+    private final static String PRODUCER_CLUSTER = "producerCluster";
+    private final static String CONSUMER_CLUSTER = "consumerCluster";
     private HttpServer httpServer;
     private static File tmpAppsDir;
 
@@ -47,11 +51,11 @@ public class TestProducerConsumer {
         File gradlewFile = CoreTestUtils.findGradlewInRootDir();
 
         CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
-                + "/test-apps/s4-showtime/build.gradle"), "installS4R",
+                + "/test-apps/producer-app/build.gradle"), "installS4R",
                 new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
 
         CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
-                + "/test-apps/s4-counter/build.gradle"), "installS4R",
+                + "/test-apps/consumer-app/build.gradle"), "installS4R",
                 new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
     }
 
@@ -85,7 +89,8 @@ public class TestProducerConsumer {
     @After
     public void cleanup() throws Exception {
         CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-        CommTestUtils.killS4App(forkedNode);
+        CommTestUtils.killS4App(forkedProducerNode);
+        CommTestUtils.killS4App(forkedConsumerNode);
     }
 
     private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException,
@@ -98,36 +103,38 @@ public class TestProducerConsumer {
     @Test
     public void testInitialDeploymentFromFileSystem() throws Exception {
 
-        File showtimeS4R = new File(loadConfig().getString("appsDir") + File.separator + "showtime"
+        File producerS4R = new File(loadConfig().getString("appsDir") + File.separator + "producer"
                 + System.currentTimeMillis() + ".s4r");
         System.out.println(tmpAppsDir.getAbsolutePath());
         Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
-                + "/s4-showtime-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(showtimeS4R)) > 0);
-        String uriShowtime = showtimeS4R.toURI().toString();
+                + "/producer-app-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(producerS4R)) > 0);
+        String uriProducer = producerS4R.toURI().toString();
 
-        File counterS4R = new File(loadConfig().getString("appsDir") + File.separator + "counter"
+        File consumerS4R = new File(loadConfig().getString("appsDir") + File.separator + "consumer"
                 + System.currentTimeMillis() + ".s4r");
-        Assert.assertTrue(ByteStreams.copy(
-                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/s4-counter-0.0.0-SNAPSHOT.s4r")),
-                Files.newOutputStreamSupplier(counterS4R)) > 0);
+        Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                + "/consumer-app-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(consumerS4R)) > 0);
 
-        String uriCounter = counterS4R.toURI().toString();
+        String uriConsumer = consumerS4R.toURI().toString();
 
         initializeS4Node();
 
         ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
-        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/showtime", record1, CreateMode.PERSISTENT);
+        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriProducer);
+        zkClient.create("/s4/clusters/" + PRODUCER_CLUSTER + "/app/s4App", record1, CreateMode.PERSISTENT);
 
         ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-        record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
-        zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/counter", record2, CreateMode.PERSISTENT);
+        record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriConsumer);
+        zkClient.create("/s4/clusters/" + CONSUMER_CLUSTER + "/app/s4App", record2, CreateMode.PERSISTENT);
+
+        CountDownLatch signalConsumptionComplete = new CountDownLatch(1);
+        CommTestUtils.watchAndSignalCreation("/1000TicksReceived", signalConsumptionComplete,
+                CommTestUtils.createZkClient());
+        Assert.assertTrue(signalConsumptionComplete.await(20, TimeUnit.SECONDS));
 
-        // TODO validate test through some Zookeeper notifications
-        Thread.sleep(10000);
     }
 
-    private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException {
+    private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException, KeeperException {
         // 0. package s4 app
         // TODO this is currently done offline, and the app contains the TestApp class copied from the one in the
         // current package .
@@ -135,31 +142,45 @@ public class TestProducerConsumer {
         // 1. start s4 nodes. Check that no app is deployed.
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
         taskSetup.clean("s4");
-        taskSetup.setup(CLUSTER_NAME, 1, 1300);
+        taskSetup.setup(PRODUCER_CLUSTER, 1, 1300);
+
+        TaskSetup taskSetup2 = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+        taskSetup2.setup(CONSUMER_CLUSTER, 1, 1400);
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        List<String> processes = zkClient.getChildren("/s4/clusters/" + CLUSTER_NAME + "/process");
+        List<String> processes = zkClient.getChildren("/s4/clusters/" + PRODUCER_CLUSTER + "/process");
         Assert.assertTrue(processes.size() == 0);
         final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 
-        zkClient.subscribeChildChanges("/s4/clusters/" + CLUSTER_NAME + "/process", new IZkChildListener() {
+        zkClient.subscribeChildChanges("/s4/clusters/" + PRODUCER_CLUSTER + "/process", new IZkChildListener() {
 
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-                if (currentChilds.size() == 2) {
+                if (currentChilds.size() == 1) {
                     signalProcessesReady.countDown();
                 }
 
             }
         });
 
-        forkedNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CLUSTER_NAME });
+        zkClient.subscribeChildChanges("/s4/clusters/" + CONSUMER_CLUSTER + "/process", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                if (currentChilds.size() == 1) {
+                    signalProcessesReady.countDown();
+                }
+
+            }
+        });
+
+        forkedProducerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + PRODUCER_CLUSTER });
+        forkedConsumerNode = CoreTestUtils.forkS4Node(new String[] { "-cluster=" + CONSUMER_CLUSTER });
 
         // TODO synchro with ready state from zk
-        Thread.sleep(10000);
-        // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+        // Thread.sleep(10000);
+        Assert.assertTrue(signalProcessesReady.await(20, TimeUnit.SECONDS));
 
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index d9bf9a6..832ce1b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -26,15 +26,19 @@ import com.google.common.io.PatternFilenameFilter;
 public class CoreTestUtils extends CommTestUtils {
 
     public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
-        return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
+        return forkProcess(App.class.getName(), -1, moduleClass.getName(), appClass.getName());
     }
 
     public static Process forkS4Node() throws IOException, InterruptedException {
-        return forkProcess(Main.class.getName(), new String[] {});
+        return forkS4Node(new String[] {});
     }
 
     public static Process forkS4Node(String[] args) throws IOException, InterruptedException {
-        return forkProcess(Main.class.getName(), args);
+        return forkS4Node(-1, args);
+    }
+
+    public static Process forkS4Node(int debugPort, String[] args) throws IOException, InterruptedException {
+        return forkProcess(Main.class.getName(), debugPort, args);
     }
 
     public static File findGradlewInRootDir() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 9471910..d1d8857 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -5,8 +5,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
@@ -21,6 +19,7 @@ import org.slf4j.LoggerFactory;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.FileConverter;
+import com.google.common.base.Strings;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 
@@ -44,15 +43,23 @@ public class Deploy extends S4ArgsBase {
 
             tmpAppsDir = Files.createTempDir();
 
-            File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+            if (!Strings.isNullOrEmpty(deployArgs.s4rPath) && !Strings.isNullOrEmpty(deployArgs.generatedS4R)) {
+                logger.error("-s4r and -generatedS4R options are mutually exclusive");
+                System.exit(1);
+            }
 
-            String s4rPath = null;
+            File s4rToDeploy;
 
             if (deployArgs.s4rPath != null) {
-                s4rPath = deployArgs.s4rPath;
-                logger.info(
-                        "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
-                        s4rPath);
+                s4rToDeploy = new File(deployArgs.s4rPath);
+                if (!s4rToDeploy.exists()) {
+                    logger.error("Specified S4R file does not exist in {}", s4rToDeploy.getAbsolutePath());
+                    System.exit(1);
+                } else {
+                    logger.info(
+                            "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+                            s4rToDeploy.getAbsolutePath());
+                }
             } else {
                 List<String> params = new ArrayList<String>();
                 // prepare gradle -P parameters, including passed gradle opts
@@ -61,19 +68,39 @@ public class Deploy extends S4ArgsBase {
                 params.add("appsDir=" + tmpAppsDir.getAbsolutePath());
                 params.add("appName=" + deployArgs.appName);
                 ExecGradle.exec(deployArgs.gradleBuildFile, "installS4R", params.toArray(new String[] {}));
-                s4rPath = tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r";
+                File tmpS4R = new File(tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r");
+                if (!Strings.isNullOrEmpty(deployArgs.generatedS4R)) {
+                    logger.info("Copying generated S4R to [{}]", deployArgs.generatedS4R);
+                    s4rToDeploy = new File(deployArgs.generatedS4R);
+                    if (!(ByteStreams.copy(Files.newInputStreamSupplier(tmpS4R),
+                            Files.newOutputStreamSupplier(s4rToDeploy)) > 0)) {
+                        logger.error("Cannot copy generated s4r from {} to {}", tmpS4R.getAbsolutePath(),
+                                s4rToDeploy.getAbsolutePath());
+                        System.exit(1);
+                    }
+                } else {
+                    s4rToDeploy = tmpS4R;
+                }
             }
-            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(s4rPath)),
-                    Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
 
             final String uri = s4rToDeploy.toURI().toString();
             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
             record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName, record,
-                    CreateMode.PERSISTENT);
-            logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
-                    deployArgs.appName, deployArgs.clusterName,
-                    "/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName });
+            record.putSimpleField("name", deployArgs.appName);
+            String deployedAppPath = "/s4/clusters/" + deployArgs.clusterName + "/app/s4App";
+            if (zkClient.exists(deployedAppPath)) {
+                ZNRecord readData = zkClient.readData(deployedAppPath);
+                logger.error("Cannot deploy app [{}], because app [{}] is already deployed", deployArgs.appName,
+                        readData.getSimpleField("name"));
+                System.exit(1);
+            }
+
+            zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/app/s4App", record, CreateMode.PERSISTENT);
+            logger.info(
+                    "uploaded application [{}] to cluster [{}], using zookeeper znode [{}], and s4r file [{}]",
+                    new String[] { deployArgs.appName, deployArgs.clusterName,
+                            "/s4/clusters/" + deployArgs.clusterName + "/app/" + deployArgs.appName,
+                            s4rToDeploy.getAbsolutePath() });
 
         } catch (Exception e) {
             LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
@@ -87,13 +114,16 @@ public class Deploy extends S4ArgsBase {
         @Parameter(names = { "-b", "-buildFile" }, description = "Full path to gradle build file for the S4 application", required = false, converter = FileConverter.class, validateWith = FileExistsValidator.class)
         File gradleBuildFile;
 
-        @Parameter(names = "-s4r", description = "Path to s4r file", required = false)
+        @Parameter(names = "-s4r", description = "Path to existing s4r file", required = false)
         String s4rPath;
 
+        @Parameter(names = { "-generatedS4R", "-g" }, description = "Location of generated s4r (incompatible with -s4r option). By default, s4r is generated in a temporary directory on the local file system. In a distributed environment, you probably want to specify a location accessible through a distributed file system like NFS. That's the purpose of this option.", required = false)
+        String generatedS4R;
+
         @Parameter(names = { "-a", "-appClass" }, description = "Full class name of the application class (extending App or AdapterApp)", required = false)
         String appClass = "";
 
-        @Parameter(names = "-appName", description = "Name of S4 application. This will be the name of the s4r file as well", required = true)
+        @Parameter(names = "-appName", description = "Name of S4 application.", required = true)
         String appName;
 
         @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/test-apps/consumer-app/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/build.gradle b/test-apps/consumer-app/build.gradle
new file mode 100644
index 0000000..3bfe72f
--- /dev/null
+++ b/test-apps/consumer-app/build.gradle
@@ -0,0 +1,222 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+project.ext["s4AppInstallDir"] = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+project.ext["s4Version"] = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+project.ext["libraries"] = [
+           json:               'org.json:json:20090211',
+           guava:              'com.google.guava:guava:10.0.1',
+           gson:               'com.google.code.gson:gson:1.6',
+           guice:              'com.google.inject:guice:3.0',
+           guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
+           guice_grapher:      'com.google.inject:guice-grapher:3.0',
+           flexjson:           'net.sf.flexjson:flexjson:2.1',
+           bcel:               'org.apache.bcel:bcel:5.2',
+           jakarta_regexp:     'jakarta-regexp:jakarta-regexp:1.4',
+           kryo:               'com.googlecode:kryo:1.04',
+           netty:              'org.jboss.netty:netty:3.2.5.Final',
+           reflectasm:         'com.esotericsoftware:reflectasm:0.8',
+           minlog:             'com.esotericsoftware:minlog:1.2',
+           asm:                'asm:asm:3.2',
+           commons_io:         'commons-io:commons-io:2.0.1',
+           commons_config:     'commons-configuration:commons-configuration:1.6',
+           commons_codec:      'commons-codec:commons-codec:1.4',
+           commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+           commons_coll:       'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+           slf4j:              'org.slf4j:slf4j-api:1.6.1',
+           logback_core:       'ch.qos.logback:logback-core:0.9.29',
+           logback_classic:    'ch.qos.logback:logback-classic:0.9.29',
+           zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
+           jcip:               'net.jcip:jcip-annotations:1.0',
+           junit:              'junit:junit:4.10',
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile project(":s4-base")
+   compile project(":s4-comm")
+   compile project(":s4-core")
+
+   /* Logging. */
+   compile( libraries.slf4j )
+   compile( libraries.logback_core )
+   compile( libraries.logback_classic )
+
+   /* Commons. */
+   compile( libraries.commons_io )
+   compile( libraries.commons_config )
+   compile( libraries.commons_coll )
+
+   /* Misc. */
+   compile( libraries.jcip )
+
+   /* Testing. */
+   testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+project.ext["appDependencies"] = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+
+   if (appClassname == "UNKNOWN") {
+
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   def lines= file.readLines()
+   def packageName=""
+   for(line in lines) {
+
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found app class name: " + classname
+           break
+       }
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java
new file mode 100644
index 0000000..c13c53e
--- /dev/null
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerApp.java
@@ -0,0 +1,29 @@
+package s4app;
+
+import org.apache.s4.core.App;
+
+public class ConsumerApp extends App {
+
+    private ConsumerPE consumerPE;
+
+    @Override
+    protected void onStart() {
+        System.out.println("Starting ShowTimeApp...");
+    }
+
+    @Override
+    protected void onInit() {
+        System.out.println("Initing ShowTimeApp...");
+
+        ConsumerPE consumerPE = createPE(ConsumerPE.class, "consumer");
+        consumerPE.setSingleton(true);
+
+        /* This stream will receive events from another app. */
+        createInputStream("tickStream", consumerPE);
+    }
+
+    @Override
+    protected void onClose() {
+        System.out.println("Closing ShowTimeApp...");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f6f91749/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
----------------------------------------------------------------------
diff --git a/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
new file mode 100644
index 0000000..d0b1577
--- /dev/null
+++ b/test-apps/consumer-app/src/main/java/s4app/ConsumerPE.java
@@ -0,0 +1,44 @@
+package s4app;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(ConsumerPE.class);
+    long eventCount = 0;
+
+    public ConsumerPE(App app) {
+        super(app);
+    }
+
+    public void onEvent(Event event) {
+        eventCount++;
+        logger.info(
+                "Received event with tick {} and time {} for event # {}",
+                new String[] { String.valueOf(event.get("tick", Long.class)), String.valueOf(event.getTime()),
+                        String.valueOf(eventCount) });
+        if (eventCount == 1000) {
+            logger.info("Just reached 1000 events");
+            ZkClient zkClient = new ZkClient("localhost:2181");
+            zkClient.create("/1000TicksReceived", new byte[0], CreateMode.PERSISTENT);
+        }
+
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+}