You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[20/22] git commit: inter-cluster communication - added classes for interacting with remote cluster : remote topology, remote emitter - added twitter topic count example

inter-cluster communication
- added classes for interacting with remote cluster : remote topology, remote emitter
- added twitter topic count example


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d11f7fbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d11f7fbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d11f7fbc

Branch: refs/heads/piper
Commit: d11f7fbc5b331a09465677f721fa8fab7bab0654
Parents: 17c5ab8
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Mar 25 15:53:05 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 29 13:25:53 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |   10 +-
 .../src/main/java/org/apache/s4/base/Event.java    |   10 +-
 .../java/org/apache/s4/base/RemoteEmitter.java     |    5 +
 .../org/apache/s4/base/util/MultiClassLoader.java  |   31 +--
 .../org/apache/s4/comm/tcp/TCPRemoteEmitter.java   |   15 +
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |    6 +-
 .../apache/s4/comm/topology/RemoteTopology.java    |    5 +
 .../s4/comm/topology/RemoteTopologyFromZK.java     |   16 ++
 .../s4/comm/topology/AssignmentFromZKTest.java     |    5 +-
 .../s4/comm/topology/TopologyFromZKTest.java       |    5 +-
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |   12 +-
 subprojects/s4-core/s4-core.gradle                 |    2 +
 .../src/main/java/org/apache/s4/core/App.java      |   10 +-
 .../src/main/java/org/apache/s4/core/Main.java     |    2 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |   12 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |   18 ++
 .../src/main/java/org/apache/s4/core/Server.java   |   65 ++----
 .../src/main/java/org/apache/s4/core/Stream.java   |   16 +-
 .../java/org/apache/s4/core/adapter/Adapter.java   |   26 ++
 .../org/apache/s4/core/adapter/AdapterMain.java    |   59 +++++
 .../org/apache/s4/core/adapter/AdapterModule.java  |   97 +++++++
 .../org/apache/s4/core/adapter/RemoteStream.java   |   60 +++++
 .../apache/s4/deploy/TestAutomaticDeployment.java  |    2 +-
 .../test/java/org/apache/s4/deploy/TestModule.java |   62 -----
 .../s4/deploy/prodcon/TestProducerConsumer.java    |    2 +-
 .../test/java/org/apache/s4/fixtures/ZKServer.java |   24 ++-
 .../resources/org.apache.s4.deploy.s4.properties   |    6 +-
 test-apps/twitter-adapter/build.gradle             |  200 +++++++++++++++
 .../s4/example/twitter/TwitterInputAdapter.java    |  113 ++++++++
 .../src/main/resources/default.s4.properties       |   18 ++
 .../src/main/resources/twitter4j.properties        |    5 +
 test-apps/twitter-counter/build.gradle             |  200 +++++++++++++++
 .../org/apache/s4/example/twitter/TopNTopicPE.java |   79 ++++++
 .../s4/example/twitter/TopicCountAndReportPE.java  |   47 ++++
 .../s4/example/twitter/TopicExtractorPE.java       |   49 ++++
 .../apache/s4/example/twitter/TopicSeenEvent.java  |   17 ++
 .../s4/example/twitter/TwitterCounterApp.java      |   77 ++++++
 37 files changed, 1213 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1bafbcf..7db43d7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -70,7 +70,9 @@ libraries = [
     zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
     jcip:               'net.jcip:jcip-annotations:1.0',
     junit:              'junit:junit:4.10',
-    zkclient:           'com.github.sgroschupf:zkclient:0.1'
+    zkclient:           'com.github.sgroschupf:zkclient:0.1',
+    diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
+    jcommander:         'com.beust:jcommander:1.23'
 ]
 
 subprojects {
@@ -84,10 +86,10 @@ subprojects {
     targetCompatibility = 1.6
 
     checkstyleConfigFileName = "$rootDir/config/checkstyle/s4-checkstyle.xml"
-    
+
 
     dependencies {
-        
+
         /* Google. */
         compile( libraries.guava )
         compile( libraries.guice )
@@ -105,7 +107,7 @@ subprojects {
         /* Misc. */
         compile( libraries.jcip )
         compile( libraries.zk )
-        
+
         /* Testing. */
         testCompile( libraries.junit )
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 2e023cb..ca6f607 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -27,7 +27,7 @@ public class Event {
     private static final Logger logger = LoggerFactory.getLogger(Event.class);
 
     final private long time;
-    private int streamId;
+    private String streamName;
     private int appId;
     private Map<String, Data<?>> map;
 
@@ -56,8 +56,8 @@ public class Event {
      * 
      * @return the target stream id
      */
-    public int getStreamId() {
-        return streamId;
+    public String getStreamName() {
+        return streamName;
     }
 
     /**
@@ -66,8 +66,8 @@ public class Event {
      * 
      * @param targetStreamId
      */
-    public void setStreamId(int streamId) {
-        this.streamId = streamId;
+    public void setStreamId(String streamName) {
+        this.streamName = streamName;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java
new file mode 100644
index 0000000..59d9164
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java
@@ -0,0 +1,5 @@
+package org.apache.s4.base;
+
+public interface RemoteEmitter extends Emitter {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
index 8c702df..0e942b6 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
@@ -17,16 +17,14 @@ import com.google.common.collect.MapMaker;
  * The source code for this class was derived from <a href=
  * "http://code.google.com/p/db4o-om/source/browse/trunk/objectmanager-api/src/com/db4o/objectmanager/configuration/MultiClassLoader.java"
  * >this project</a> which was derived from this <a href=
- * "http://www.javaworld.com/javaworld/jw-10-1996/jw-10-indepth.html?page=1"
- * >article by Chuck McManis</a>.
+ * "http://www.javaworld.com/javaworld/jw-10-1996/jw-10-indepth.html?page=1" >article by Chuck McManis</a>.
  * 
  * 
  * Thank you to the authors!
  */
 abstract public class MultiClassLoader extends ClassLoader {
 
-    private static final Logger logger = LoggerFactory
-            .getLogger(MultiClassLoader.class);
+    private static final Logger logger = LoggerFactory.getLogger(MultiClassLoader.class);
 
     private final Map<String, Class<?>> classes;
     private char classNameReplacementChar;
@@ -37,8 +35,8 @@ abstract public class MultiClassLoader extends ClassLoader {
 
     // ---------- Superclass Overrides ------------------------
     /**
-     * This is a simple version for external clients since they will always want
-     * the class resolved before it is returned to them.
+     * This is a simple version for external clients since they will always want the class resolved before it is
+     * returned to them.
      */
     @Override
     public Class<?> loadClass(String className) throws ClassNotFoundException {
@@ -46,28 +44,26 @@ abstract public class MultiClassLoader extends ClassLoader {
     }
 
     @Override
-    public synchronized Class<?> loadClass(String className, boolean resolveIt)
-            throws ClassNotFoundException {
+    public synchronized Class<?> loadClass(String className, boolean resolveIt) throws ClassNotFoundException {
 
         Class<?> result;
         byte[] classBytes;
-        logger.debug("MultiClassLoader loadClass - className: " + className
-                + ", resolveIt: " + resolveIt);
+        logger.trace("MultiClassLoader loadClass - className: " + className + ", resolveIt: " + resolveIt);
 
         /* Check our local cache of classes. */
         result = classes.get(className);
         if (result != null) {
-            logger.debug("Returning cached result for class [{}]", className);
+            logger.trace("Returning cached result for class [{}]", className);
             return result;
         }
 
         /* Check with the primordial class loader. */
         try {
             result = super.findSystemClass(className);
-            logger.debug("Returning system class (in CLASSPATH) [{}]", className);
+            logger.trace("Returning system class (in CLASSPATH) [{}]", className);
             return result;
         } catch (ClassNotFoundException e) {
-            logger.debug("Not a system class [{}]", className);
+            logger.trace("Not a system class [{}]", className);
         }
 
         classBytes = loadClassBytes(className);
@@ -94,15 +90,14 @@ abstract public class MultiClassLoader extends ClassLoader {
         if (result == null)
             return null;
         classes.put(className, result);
-        logger.debug("Returning newly loaded class [{}]", className);
+        logger.trace("Returning newly loaded class [{}]", className);
         return result;
     }
 
     /**
-     * This optional call allows a class name such as "COM.test.Hello" to be
-     * changed to "COM_test_Hello", which is useful for storing classes from
-     * different packages in the same retrieval directory. In the above example
-     * the char would be '_'.
+     * This optional call allows a class name such as "COM.test.Hello" to be changed to "COM_test_Hello", which is
+     * useful for storing classes from different packages in the same retrieval directory. In the above example the char
+     * would be '_'.
      */
     public void setClassNameReplacementChar(char replacement) {
         classNameReplacementChar = replacement;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
new file mode 100644
index 0000000..0fa663f
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -0,0 +1,15 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.topology.RemoteTopology;
+
+import com.google.inject.Inject;
+
+public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
+
+    @Inject
+    public TCPRemoteEmitter(RemoteTopology topology) throws InterruptedException {
+        super(topology);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 193daff..0466993 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -22,7 +22,7 @@ public class TaskSetup {
         zkclient.deleteRecursive("/" + clusterName);
     }
 
-    public void setup(String clusterName, int tasks) {
+    public void setup(String clusterName, int tasks, int initialPort) {
         zkclient.createPersistent("/" + clusterName + "/tasks", true);
         zkclient.createPersistent("/" + clusterName + "/process", true);
         zkclient.createPersistent("/" + clusterName + "/apps", true);
@@ -30,7 +30,7 @@ public class TaskSetup {
             String taskId = "Task-" + i;
             ZNRecord record = new ZNRecord(taskId);
             record.putSimpleField("taskId", taskId);
-            record.putSimpleField("port", String.valueOf(1300 + i));
+            record.putSimpleField("port", String.valueOf(initialPort + i));
             record.putSimpleField("partition", String.valueOf(i));
             record.putSimpleField("cluster", clusterName);
             zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
@@ -42,7 +42,7 @@ public class TaskSetup {
         TaskSetup taskSetup = new TaskSetup("localhost:2181");
         String clusterName = "test-s4-cluster";
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10);
+        taskSetup.setup(clusterName, 10, 1300);
         String zookeeperAddress = "localhost:2181";
         for (int i = 0; i < 10; i++) {
             AssignmentFromZK assignmentFromZK = new AssignmentFromZK(

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
new file mode 100644
index 0000000..8f7cc1a
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
@@ -0,0 +1,5 @@
+package org.apache.s4.comm.topology;
+
+public interface RemoteTopology extends Topology {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
new file mode 100644
index 0000000..39ede90
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
@@ -0,0 +1,16 @@
+package org.apache.s4.comm.topology;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class RemoteTopologyFromZK extends TopologyFromZK implements RemoteTopology {
+
+    @Inject
+    public RemoteTopologyFromZK(@Named("cluster.remote.name") String remoteClusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        super(remoteClusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
index d733e9b..e709d68 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -1,8 +1,9 @@
 package org.apache.s4.comm.topology;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.junit.Test;
@@ -14,7 +15,7 @@ public class AssignmentFromZKTest extends ZKBaseTest {
         TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
         final String clusterName = "test-s4-cluster";
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10);
+        taskSetup.setup(clusterName, 10, 1300);
         final CountDownLatch latch = new CountDownLatch(10);
         for (int i = 0; i < 10; i++) {
             Runnable runnable = new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index eb5f42c..65eee28 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -1,11 +1,12 @@
 package org.apache.s4.comm.topology;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import static org.junit.Assert.*;
 
 import org.apache.s4.comm.tools.TaskSetup;
 import org.junit.Test;
@@ -17,7 +18,7 @@ public class TopologyFromZKTest extends ZKBaseTest {
         TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
         final String clusterName = "test-s4-cluster";
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10);
+        taskSetup.setup(clusterName, 10, 1300);
 
         final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
         final Lock lock = new ReentrantLock();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 02c5467..bda3bd8 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -6,6 +6,7 @@ import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +44,15 @@ public abstract class ZkBasedTest {
         TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
         final String clusterName = "s4-test-cluster";
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1);
+        taskSetup.setup(clusterName, 1, 1300);
+    }
+
+    @After
+    public void cleanupZkBasedTest() {
+        if (zkServer != null) {
+            zkServer.shutdown();
+            zkServer = null;
+        }
+        CommTestUtils.cleanupTmpDirs();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 1bdbb6c..afa85c9 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -19,6 +19,8 @@ description = 'The S4 core platform.'
 dependencies {
     compile project(":s4-base")
     compile project(":s4-comm")
+    compile project(path: ':s4-comm', configuration: 'tests')
+    compile libraries.jcommander
     testCompile project(path: ':s4-comm', configuration: 'tests')
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0939d8e..2575013 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -92,7 +92,7 @@ public abstract class App {
     }
 
     /* Should only be used within the core package. */
-    void addStream(Streamable stream) {
+    public void addStream(Streamable stream) {
         streams.add(stream);
     }
 
@@ -130,10 +130,10 @@ public abstract class App {
         }
         //
         // /* Allow abstract PE to initialize. */
-        // for (ProcessingElement pe : getPePrototypes()) {
-        // logger.info("Init prototype [{}].", pe.getClass().getName());
-        // pe.initPEPrototypeInternal();
-        // }
+        for (ProcessingElement pe : getPePrototypes()) {
+            logger.info("Init prototype [{}].", pe.getClass().getName());
+            pe.initPEPrototypeInternal();
+        }
 
         onStart();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index a55511c..71756a4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -63,7 +63,7 @@ public class Main {
     private static void startServer(final Logger logger, Injector injector) {
         Server server = injector.getInstance(Server.class);
         try {
-            server.start();
+            server.start(injector);
         } catch (Exception e) {
             logger.error("Failed to start the controller.", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 90ae0bf..259496b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -31,7 +31,7 @@ public class Receiver implements Runnable {
 
     final private Listener listener;
     final private SerializerDeserializer serDeser;
-    private Map<Integer, Map<Integer, Stream<? extends Event>>> streams;
+    private Map<Integer, Map<String, Stream<? extends Event>>> streams;
     private Thread thread;
 
     @Inject
@@ -52,23 +52,23 @@ public class Receiver implements Runnable {
     /** Save stream keyed by app id and stream id. */
     void addStream(Stream<? extends Event> stream) {
         int appId = stream.getApp().getId();
-        Map<Integer, Stream<? extends Event>> appMap = streams.get(appId);
+        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
         if (appMap == null) {
             appMap = new MapMaker().makeMap();
             streams.put(appId, appMap);
         }
-        appMap.put(stream.getId(), stream);
+        appMap.put(stream.getName(), stream);
     }
 
     /** Remove stream when it is no longer needed. */
     void removeStream(Stream<? extends Event> stream) {
         int appId = stream.getApp().getId();
-        Map<Integer, Stream<? extends Event>> appMap = streams.get(appId);
+        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
         if (appMap == null) {
             logger.error("Tried to remove a stream that is not registered in the receiver.");
             return;
         }
-        appMap.remove(stream.getId());
+        appMap.remove(stream.getName());
     }
 
     public void run() {
@@ -79,7 +79,7 @@ public class Receiver implements Runnable {
             Event event = (Event) serDeser.deserialize(message);
 
             int appId = event.getAppId();
-            int streamId = event.getStreamId();
+            String streamId = event.getStreamName();
 
             /*
              * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
new file mode 100644
index 0000000..48b980b
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -0,0 +1,18 @@
+package org.apache.s4.core;
+
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class RemoteSender extends Sender {
+
+    @Inject
+    public RemoteSender(RemoteEmitter emitter, SerializerDeserializer serDeser, Hasher hasher) {
+        super(emitter, serDeser, hasher);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index dd80e51..0d7711b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -8,14 +8,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.jar.Attributes.Name;
 import java.util.jar.JarFile;
 
-import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.s4.base.Event;
 import org.apache.s4.base.util.S4RLoader;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.core.adapter.Adapter;
 import org.apache.s4.deploy.DeploymentManager;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,7 +22,6 @@ import ch.qos.logback.classic.Level;
 import com.google.common.collect.Maps;
 import com.google.common.io.PatternFilenameFilter;
 import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.name.Named;
@@ -72,44 +69,11 @@ public class Server {
 
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        initializeZkStreams(clusterName);
-        watchZkStreams();
     }
 
-    private void watchZkStreams() {
-        zkClient.subscribeChildChanges("/" + clusterName + "/streams/producers", new IZkChildListener() {
-
-            @Override
-            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-                // synchronized (streams) {
-                // for (String child : currentChilds) {
-                // if (apps.containsKey(child)) {
-                // eventSources.put(paramK, paramV)
-                // }
-                // }
-                // }
-
-            }
-        });
-
-    }
-
-    private void initializeZkStreams(String clusterName) {
-        try {
-            zkClient.createPersistent("/" + clusterName + "/streams");
-            zkClient.createPersistent("/" + clusterName + "/streams/producers");
-            zkClient.createPersistent("/" + clusterName + "/streams/consumers");
-        } catch (ZkException e) {
-            if (e.getCause() instanceof KeeperException.NodeExistsException) {
-                // ignore: this stream already exists
-            } else {
-                throw e;
-            }
-        }
-    }
-
-    public void start() throws Exception {
+    public void start(Injector injector) throws Exception {
 
+        this.injector = injector;
         /* Set up logger basic configuration. */
         ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
                 .getLogger(Logger.ROOT_LOGGER_NAME);
@@ -118,14 +82,15 @@ public class Server {
         AbstractModule module = null;
 
         /* Initialize communication layer module. */
-        try {
-            module = (AbstractModule) Class.forName(commModuleName).newInstance();
-        } catch (Exception e) {
-            logger.error("Unable to instantiate communication layer module.", e);
-        }
-
-        /* After some indirection we get the injector. */
-        injector = Guice.createInjector(module);
+        // TODO do we need a separate comm layer?
+        // try {
+        // module = (AbstractModule) Class.forName(commModuleName).newInstance();
+        // } catch (Exception e) {
+        // logger.error("Unable to instantiate communication layer module.", e);
+        // }
+        //
+        // /* After some indirection we get the injector. */
+        // injector = Guice.createInjector(module);
 
         File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
         for (File s4rFile : s4rFiles) {
@@ -241,6 +206,12 @@ public class Server {
             }
 
             app.setCommLayer(sender, receiver);
+
+            if (app instanceof Adapter) {
+                RemoteSender remoteSender = injector.getInstance(RemoteSender.class);
+                ((Adapter) app).setRemoteSender(remoteSender);
+            }
+
             App previous = apps.put(appName, app);
             logger.info("Loaded application from file {}", s4r.getAbsolutePath());
             signalOneAppLoaded.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index a328941..6f23fde 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -46,7 +46,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     private Thread thread;
     final private Sender sender;
     final private Receiver receiver;
-    final private int id;
+    // final private int id;
     final private App app;
 
     /**
@@ -63,9 +63,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
      *            the target PE prototypes for this stream.
      */
     public Stream(App app, String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
-        synchronized (Stream.class) {
-            id = idCounter++;
-        }
+        // synchronized (Stream.class) {
+        // id = idCounter++;
+        // }
         this.app = app;
         app.addStream(this);
         this.name = name;
@@ -111,7 +111,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     @SuppressWarnings("unchecked")
     public void put(Event event) {
         try {
-            event.setStreamId(getId());
+            event.setStreamId(getName());
             event.setAppId(app.getId());
 
             /*
@@ -182,9 +182,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     /**
      * @return the stream id
      */
-    int getId() {
-        return id;
-    }
+    // int getId() {
+    // return id;
+    // }
 
     /**
      * @return the app

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
new file mode 100644
index 0000000..25ffc59
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
@@ -0,0 +1,26 @@
+package org.apache.s4.core.adapter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteSender;
+
+import com.google.inject.Inject;
+
+public abstract class Adapter extends App {
+
+    @Inject
+    RemoteSender remoteSender;
+
+    public RemoteSender getRemoteSender() {
+        return remoteSender;
+    }
+
+    public void setRemoteSender(RemoteSender remoteSender) {
+        this.remoteSender = remoteSender;
+    }
+
+    protected <T extends Event> RemoteStream createRemoteStream(String name) {
+
+        return new RemoteStream(this, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
new file mode 100644
index 0000000..a1c496c
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
@@ -0,0 +1,59 @@
+package org.apache.s4.core.adapter;
+
+import java.io.File;
+import java.io.FileInputStream;
+
+import org.apache.s4.core.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class AdapterMain {
+    private static final Logger logger = LoggerFactory.getLogger(AdapterMain.class);
+
+    public static void main(String[] args) {
+
+        AdapterArgs adapterArgs = new AdapterArgs();
+        JCommander jc = new JCommander(adapterArgs);
+
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            e.printStackTrace();
+            jc.usage();
+        }
+
+        try {
+            Injector injector = Guice.createInjector(new AdapterModule(new FileInputStream(new File(
+                    adapterArgs.s4PropertiesFilePath))));
+            Server server = injector.getInstance(Server.class);
+            try {
+                server.start(injector);
+            } catch (Exception e) {
+                logger.error("Failed to start the controller.", e);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    @Parameters(separators = "=")
+    static class AdapterArgs {
+
+        @Parameter(names = "-moduleClass", description = "module class name")
+        String moduleClass;
+
+        @Parameter(names = "-adapterClass", description = "adapter class name")
+        String adapterClass;
+
+        @Parameter(names = "-s4Properties", description = "s4 properties file path")
+        String s4PropertiesFilePath;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
new file mode 100644
index 0000000..6e5c4a0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
@@ -0,0 +1,97 @@
+package org.apache.s4.core.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.tcp.TCPRemoteEmitter;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.RemoteTopology;
+import org.apache.s4.comm.topology.RemoteTopologyFromZK;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class AdapterModule extends AbstractModule {
+
+    InputStream configFileInputStream;
+    private PropertiesConfiguration config;
+
+    public AdapterModule(InputStream configFileInputStream) {
+        this.configFileInputStream = configFileInputStream;
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (configFileInputStream != null) {
+            try {
+                configFileInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        int numHosts = config.getList("cluster.hosts").size();
+        boolean isCluster = numHosts > 1 ? true : false;
+        bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
+
+        bind(Cluster.class);
+
+        bind(Assignment.class).to(AssignmentFromZK.class);
+
+        bind(Topology.class).to(TopologyFromZK.class);
+        bind(RemoteTopology.class).to(RemoteTopologyFromZK.class);
+
+        bind(RemoteEmitter.class).to(TCPRemoteEmitter.class);
+        bind(Emitter.class).to(TCPEmitter.class);
+        bind(Listener.class).to(TCPListener.class);
+
+        // TODO downstream hasher
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+    }
+
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(configFileInputStream);
+
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
new file mode 100644
index 0000000..a83feee
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
@@ -0,0 +1,60 @@
+package org.apache.s4.core.adapter;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.RemoteSender;
+import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream that dispatches events to a remote cluster
+ * 
+ */
+public class RemoteStream implements Streamable<Event> {
+
+    private Thread thread;
+    String name;
+
+    RemoteSender remoteSender;
+    int id;
+    private Adapter adapter;
+    private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
+
+    private static AtomicInteger remoteStreamCounter = new AtomicInteger();
+
+    public RemoteStream(Adapter adapter, String name) {
+        this.name = name;
+        this.adapter = adapter;
+        adapter.addStream(this);
+        remoteSender = adapter.getRemoteSender();
+        this.id = remoteStreamCounter.addAndGet(1);
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void put(Event event) {
+        event.setStreamId(name);
+        event.setAppId(adapter.getId());
+
+        // TODO specify partitioning?
+        remoteSender.sendToRemotePartitions(event);
+
+    }
+
+    @Override
+    public void close() {
+        thread.interrupt();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 279ccaa..4e0a3e7 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -296,7 +296,7 @@ public class TestAutomaticDeployment {
         clusterName = config.getString("cluster.name");
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1);
+        taskSetup.setup(clusterName, 1, 1300);
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
deleted file mode 100644
index f5f3912..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.s4.deploy;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public class TestModule extends AbstractModule {
-
-    private PropertiesConfiguration config;
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromZK.class);
-        bind(Topology.class).to(TopologyFromZK.class);
-        bind(Emitter.class).to(TCPEmitter.class);
-        bind(Listener.class).to(TCPListener.class);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 1418396..e853df6 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -140,7 +140,7 @@ public class TestProducerConsumer {
         clusterName = config.getString("cluster.name");
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
         taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1);
+        taskSetup.setup(clusterName, 1, 1300);
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
index 3a33219..b79f1a0 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
@@ -1,5 +1,7 @@
 package org.apache.s4.fixtures;
 
+import java.util.Arrays;
+
 import org.apache.s4.comm.tools.TaskSetup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,6 +23,8 @@ public class ZKServer {
         try {
             jc.parse(args);
         } catch (Exception e) {
+            System.out.println(Arrays.toString(args));
+            e.printStackTrace();
             jc.usage();
             System.exit(-1);
         }
@@ -28,10 +32,12 @@ public class ZKServer {
 
             logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
                     clusterArgs.nbTasks);
-            CommTestUtils.startZookeeperServer();
+            if (clusterArgs.startZK) {
+                CommTestUtils.startZookeeperServer();
+            }
             TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
             taskSetup.clean(clusterArgs.clusterName);
-            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks);
+            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
             logger.info("Zookeeper started");
         } catch (Exception e) {
             logger.error("Cannot initialize zookeeper with specified configuration", e);
@@ -42,14 +48,20 @@ public class ZKServer {
     @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
     static class ZKServerArgs {
 
-        @Parameter(names = "cluster", description = "S4 cluster name", required = true)
+        @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
         String clusterName = "s4-test-cluster";
 
-        @Parameter(names = "nbTasks", description = "number of tasks for the cluster", required = true)
+        @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
         int nbTasks = 1;
 
-        @Parameter(names = "zk", description = "Zookeeper connection string")
-        String zkConnectionString = "localhost:2181";
+        @Parameter(names = "-zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:21810";
+
+        @Parameter(names = "-startZK", description = "Start local zookeeper server (connection string ignored in that case)", required = false)
+        boolean startZK = false;
+
+        @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+        int firstListeningPort = -1;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
index d74927f..9a5d11a 100644
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -6,6 +6,6 @@ cluster.name = s4-test-cluster
 cluster.zk_address = localhost:21810
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.deploy.TestModule
-s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
\ No newline at end of file
+comm.module = org.apache.s4.core.adapter.AdapterModule
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
new file mode 100644
index 0000000..669d62b
--- /dev/null
+++ b/test-apps/twitter-adapter/build.gradle
@@ -0,0 +1,200 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+//version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+repositories {
+    mavenLocal()
+    mavenCentral()
+    mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"
+
+    /* Add lib dir as a repo. Some jar files that are not available
+     in a public repo are distributed in the lib dir. */
+    flatDir name: 'libDir', dirs: "$rootDir/lib"
+}
+
+/* All project libraries must be defined here. */
+libraries = [
+           twitter4j_core:     'org.twitter4j:twitter4j-core:2.2.5',
+           twitter4j_stream:   'org.twitter4j:twitter4j-stream:2.2.5',
+           s4_base:            'org.apache.s4:s4-base:0.5.0-SNAPSHOT',
+           s4_comm:            'org.apache.s4:s4-comm:0.5.0-SNAPSHOT',
+           s4_core:            'org.apache.s4:s4-core:0.5.0-SNAPSHOT'
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile (libraries.s4_base)
+   compile (libraries.s4_comm)
+   compile (libraries.s4_core)
+   compile (libraries.twitter4j_core)
+   compile (libraries.twitter4j_stream)
+
+
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+
+   if (appClassname == "UNKNOWN") {
+
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   lines= file.readLines()
+   for(line in lines) {
+
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+Adapter.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found adapter class name: " + classname
+           break
+       }
+
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
new file mode 100644
index 0000000..ee905d9
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -0,0 +1,113 @@
+package org.apache.s4.example.twitter;
+
+import java.net.ServerSocket;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.adapter.Adapter;
+import org.apache.s4.core.adapter.RemoteStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+
+public class TwitterInputAdapter extends Adapter {
+
+    private ZkClient zkClient;
+    private static Logger logger = LoggerFactory.getLogger(TwitterInputAdapter.class);
+    private String urlString = "https://stream.twitter.com/1/statuses/sample.json";
+
+    public TwitterInputAdapter() {
+    }
+
+    private LinkedBlockingQueue<Status> messageQueue = new LinkedBlockingQueue<Status>();
+
+    protected ServerSocket serverSocket;
+
+    private Thread t;
+
+    private int messageCount;
+
+    private RemoteStream remoteStream;
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        remoteStream = createRemoteStream("RawStatus");
+        t = new Thread(new Dequeuer());
+    }
+
+    public void connectAndRead() throws Exception {
+
+        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
+        StatusListener statusListener = new StatusListener() {
+
+            @Override
+            public void onException(Exception ex) {
+                logger.error("error", ex);
+            }
+
+            @Override
+            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+                logger.error("error");
+            }
+
+            @Override
+            public void onStatus(Status status) {
+                messageQueue.add(status);
+
+            }
+
+            @Override
+            public void onScrubGeo(long userId, long upToStatusId) {
+                logger.error("error");
+            }
+
+            @Override
+            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+                logger.error("error");
+            }
+        };
+        twitterStream.addListener(statusListener);
+        twitterStream.sample();
+
+    }
+
+    @Override
+    protected void onStart() {
+        try {
+            t.start();
+            connectAndRead();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    class Dequeuer implements Runnable {
+
+        @Override
+        public void run() {
+            while (!Thread.interrupted()) {
+                try {
+                    Status status = messageQueue.take();
+                    Event event = new Event();
+                    event.put("statusText", String.class, status.getText());
+                    remoteStream.put(event);
+                } catch (Exception e) {
+
+                }
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
new file mode 100644
index 0000000..cd36aaa
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
@@ -0,0 +1,18 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-adapter-cluster
+cluster.zk_address = localhost:21810
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.deploy.TestModule
+s4.logger_level = TRACE
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10
+
+# specify the name of the remote cluster (there is currently only 1 remote cluster max)
+cluster.remote.name=s4-test-cluster

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
new file mode 100644
index 0000000..7d58c7d
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
@@ -0,0 +1,5 @@
+debug=true
+# you need to set those parameters with valid twitter account credentials
+twitter4j.user=????
+twitter4j.password=????
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
new file mode 100644
index 0000000..e737b40
--- /dev/null
+++ b/test-apps/twitter-counter/build.gradle
@@ -0,0 +1,200 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+//version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+repositories {
+    mavenLocal()
+    mavenCentral()
+    mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"
+
+    /* Add lib dir as a repo. Some jar files that are not available
+     in a public repo are distributed in the lib dir. */
+    flatDir name: 'libDir', dirs: "$rootDir/lib"
+}
+
+/* All project libraries must be defined here. */
+libraries = [
+           twitter4j_core:     'org.twitter4j:twitter4j-core:2.2.5',
+           twitter4j_stream:   'org.twitter4j:twitter4j-stream:2.2.5',
+           s4_base:            'org.apache.s4:s4-base:0.5.0-SNAPSHOT',
+           s4_comm:            'org.apache.s4:s4-comm:0.5.0-SNAPSHOT',
+           s4_core:            'org.apache.s4:s4-core:0.5.0-SNAPSHOT'
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile (libraries.s4_base)
+   compile (libraries.s4_comm)
+   compile (libraries.s4_core)
+   compile (libraries.twitter4j_core)
+   compile (libraries.twitter4j_stream)
+
+
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+
+   if (appClassname == "UNKNOWN") {
+
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   lines= file.readLines()
+   for(line in lines) {
+
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found app class name: " + classname
+           break
+       }
+
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
new file mode 100644
index 0000000..3d9a9fb
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -0,0 +1,79 @@
+package org.apache.s4.example.twitter;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TopNTopicPE extends ProcessingElement {
+
+    public TopNTopicPE(App app) {
+        super(app);
+        // TODO Auto-generated constructor stub
+    }
+
+    Map<String, Integer> countedTopics = Maps.newHashMap();
+    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
+
+    public void onEvent(TopicSeenEvent event) {
+        countedTopics.put(event.topic, event.count);
+    }
+
+    public void onTime() {
+        TreeSet<TopNEntry> sortedTopics = Sets.newTreeSet();
+        for (Map.Entry<String, Integer> topicCount : countedTopics.entrySet()) {
+            sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
+        }
+
+        int i = 0;
+        Iterator<TopNEntry> iterator = sortedTopics.iterator();
+        long time = System.currentTimeMillis();
+        while (iterator.hasNext() && i < 10) {
+            TopNEntry entry = iterator.next();
+            logger.info("{} : topic [{}] count [{}]",
+                    new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
+        }
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    class TopNEntry implements Comparable<TopNEntry> {
+        String topic = null;
+        int count = 0;
+
+        public TopNEntry(String topic, int count) {
+            this.topic = topic;
+            this.count = count;
+        }
+
+        public int compareTo(TopNEntry topNEntry) {
+            if (topNEntry.count < this.count) {
+                return -1;
+            } else if (topNEntry.count > this.count) {
+                return 1;
+            }
+            return 0;
+        }
+
+        public String toString() {
+            return "topic:" + topic + " count:" + count;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
new file mode 100644
index 0000000..a6d1478
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -0,0 +1,47 @@
+package org.apache.s4.example.twitter;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+// keyed by topic name
+public class TopicCountAndReportPE extends ProcessingElement {
+
+    Stream<TopicSeenEvent> downStream;
+    int threshold = 10;
+    int count;
+
+    public TopicCountAndReportPE(App app) {
+        super(app);
+        // TODO Auto-generated constructor stub
+    }
+
+    public void setDownstream(Stream<TopicSeenEvent> stream) {
+        this.downStream = stream;
+    }
+
+    public void onEvent(TopicSeenEvent event) {
+        count += event.count;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void onTime() {
+        if (count < threshold) {
+            return;
+        }
+        TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(), count);
+        downStream.put(topicSeenEvent);
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
new file mode 100644
index 0000000..501d6fd
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
@@ -0,0 +1,49 @@
+package org.apache.s4.example.twitter;
+
+import java.net.ServerSocket;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Streamable;
+
+import com.google.common.base.Splitter;
+
+public class TopicExtractorPE extends ProcessingElement {
+
+    static private ServerSocket serverSocket;
+    Streamable<TopicSeenEvent> downStream;
+
+    public TopicExtractorPE(App app) {
+        super(app);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    protected void onCreate() {
+
+    }
+
+    public void setDownStream(Streamable<TopicSeenEvent> stream) {
+        this.downStream = stream;
+    }
+
+    public void onEvent(Event event) {
+        String text = event.get("statusText", String.class);
+        if (text.contains("#")) {
+            Iterable<String> split = Splitter.on("#").omitEmptyStrings().trimResults()
+                    .split(text.substring(text.indexOf("#") + 1, text.length()));
+            for (String topic : split) {
+                String topicOnly = topic.split(" ")[0];
+                downStream.put(new TopicSeenEvent(topicOnly, 1));
+            }
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
new file mode 100644
index 0000000..b28d61e
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
@@ -0,0 +1,17 @@
+package org.apache.s4.example.twitter;
+
+import org.apache.s4.base.Event;
+
+public class TopicSeenEvent extends Event {
+
+    public String topic;
+    public int count;
+    public String reportKey = "x";
+
+    public TopicSeenEvent(String topic, int count) {
+        super();
+        this.topic = topic;
+        this.count = count;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
new file mode 100644
index 0000000..cf0fb40
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -0,0 +1,77 @@
+package org.apache.s4.example.twitter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+
+public class TwitterCounterApp extends App {
+
+    private ZkClient zkClient;
+
+    private Thread t;
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        try {
+
+            TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
+            topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
+            @SuppressWarnings("unchecked")
+            Stream<TopicSeenEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
+
+                @Override
+                public List<String> get(Event arg0) {
+                    return new ArrayList<String>() {
+                        {
+                            add("x");
+                        }
+                    };
+                }
+            }, topNTopicPE);
+
+            TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
+            topicCountAndReportPE.setDownstream(aggregatedTopicStream);
+            topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
+            Stream<TopicSeenEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
+
+                @Override
+                public List<String> get(final TopicSeenEvent arg0) {
+                    return new ArrayList<String>() {
+                        {
+                            add(arg0.topic);
+                        }
+                    };
+                }
+            }, topicCountAndReportPE);
+
+            TopicExtractorPE topicExtractorPE = createPE(TopicExtractorPE.class);
+            topicExtractorPE.setDownStream(topicSeenStream);
+            topicExtractorPE.setSingleton(true);
+            createStream("RawStatus", topicExtractorPE);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void onStart() {
+        // try {
+        // t.start();
+        // } catch (Exception e) {
+        // throw new RuntimeException(e);
+        // }
+    }
+}