You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/11 16:31:30 UTC

[1/2] git commit: S4-91 RemoteSenders to exclusive PEs, improve tests

S4-91 RemoteSenders to exclusive PEs, improve tests

Applied patch by Aimee Cheng


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/6aaa5536
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/6aaa5536
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/6aaa5536

Branch: refs/heads/S4-91
Commit: 6aaa5536dffdc9e8c93f12465d165881b96983d3
Parents: 16d50da
Author: Daniel Gómez Ferro <df...@apache.org>
Authored: Mon Mar 11 15:57:35 2013 +0100
Committer: Daniel Gómez Ferro <df...@apache.org>
Committed: Mon Mar 11 15:57:35 2013 +0100

----------------------------------------------------------------------
 .../apache/s4/comm/topology/AssignmentFromZK.java  |    1 -
 .../org/apache/s4/comm/topology/PartitionData.java |  103 +++++++++++++
 .../apache/s4/comm/topology/StreamConsumer.java    |   19 +++
 .../java/org/apache/s4/comm/topology/ZNRecord.java |    4 +
 .../apache/s4/comm/topology/ZkRemoteStreams.java   |   35 +++++
 .../src/main/java/org/apache/s4/core/App.java      |   75 ++++++++-
 .../org/apache/s4/core/DefaultRemoteSenders.java   |    1 +
 .../java/org/apache/s4/core/ProcessingElement.java |   39 +++--
 .../main/java/org/apache/s4/core/RemoteSender.java |   34 ++++-
 .../main/java/org/apache/s4/core/S4Bootstrap.java  |    2 +
 .../src/main/java/org/apache/s4/core/Stream.java   |    4 +
 .../java/org/apache/s4/deploy/DeploymentUtils.java |    2 +-
 .../org/apache/s4/core/ri/RemoteAdapterApp.java    |   50 ++++++
 .../org/apache/s4/core/ri/RemoteStreamRITest.java  |  119 +++++++++++++++
 .../apache/s4/core/ri/RuntimeIsolationTest.java    |  106 ++++++++-----
 .../s4/wordcount/IsolationWordCounterPE.java       |   91 +++++++++++
 16 files changed, 612 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 78d4f69..33eb920 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -32,7 +32,6 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
new file mode 100644
index 0000000..691cb34
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
@@ -0,0 +1,103 @@
+package org.apache.s4.comm.topology;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+public class PartitionData {
+    int numPartitions;
+    boolean isExclusive = false;
+    Map<Integer, Integer> globalPartitionMap = Maps.newHashMap();
+    List<String> streams = new ArrayList<String>();
+
+    public PartitionData() {
+
+    }
+
+    public PartitionData(boolean isExclusive, int numPartitions) {
+        this.numPartitions = numPartitions;
+        this.isExclusive = isExclusive;
+    }
+
+    public boolean isExclusive() {
+        return this.isExclusive;
+    }
+
+    public int getPartitionCount() {
+        return this.numPartitions;
+    }
+
+    public void addPartitionMappingInfo(int partitionId, int nodeId) {
+        globalPartitionMap.put(partitionId, nodeId);
+    }
+
+    public List<String> getStreams() {
+        return streams;
+    }
+
+    public void addStream(String stream) {
+        if (!streams.contains(stream)) {
+            streams.add(stream);
+            System.out.println("Add " + stream);
+        }
+    }
+
+    public void setStreams(List<String> streams) {
+        this.streams = streams;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
+
+    public void setPartitionCount(int numPartitions) {
+        this.numPartitions = numPartitions;
+    }
+
+    public void setExclusive(boolean isExclusive) {
+        this.isExclusive = isExclusive;
+    }
+
+    public int getGlobalePartitionId(int partitionId) {
+        return globalPartitionMap.get(partitionId);
+    }
+
+    public String toString() {
+        return "PartitionCount: " + numPartitions + ", isExclusive: " + isExclusive + ", partitionMap: "
+                + globalPartitionMap;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((globalPartitionMap == null) ? 0 : globalPartitionMap.hashCode());
+        result = prime * result + (isExclusive ? 1231 : 1237);
+        result = prime * result + numPartitions;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        PartitionData other = (PartitionData) obj;
+        if (globalPartitionMap == null) {
+            if (other.globalPartitionMap != null)
+                return false;
+        } else if (!globalPartitionMap.equals(other.globalPartitionMap))
+            return false;
+        if (isExclusive != other.isExclusive)
+            return false;
+        if (numPartitions != other.numPartitions)
+            return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
index 52fa2a8..a4cdc81 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -18,6 +18,8 @@
 
 package org.apache.s4.comm.topology;
 
+import java.util.Map;
+
 /**
  * A subscriber to a published stream. Identified through its cluster name (for dispatching to the remote cluster) and
  * application ID (for dispatching within a node (NOTE: this parameter is ignored)).
@@ -28,6 +30,11 @@ public class StreamConsumer {
     int appId;
     String clusterName;
 
+    /**
+     * The keys are PE prototype ids.
+     */
+    Map<String, PartitionData> pePartitionInfo;
+
     public StreamConsumer(int appId, String clusterName) {
         super();
         this.appId = appId;
@@ -42,6 +49,14 @@ public class StreamConsumer {
         return clusterName;
     }
 
+    public Map<String, PartitionData> getPePartitionInfo() {
+        return pePartitionInfo;
+    }
+
+    public void setPePartitionInfo(Map<String, PartitionData> pePartitionInfo) {
+        this.pePartitionInfo = pePartitionInfo;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -70,4 +85,8 @@ public class StreamConsumer {
         return true;
     }
 
+    public String toString() {
+        return "appId: " + appId + ", clusterName: " + clusterName + ", pePartitionInfo: " + pePartitionInfo;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
index aee7f6f..3c9cc7b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
@@ -103,4 +103,8 @@ public class ZNRecord {
         }
         return false;
     }
+
+    public String toString() {
+        return "id: " + id + ", simpleFields:" + simpleFields.toString() + ", mapFields" + mapFields;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
index 0855fc0..a92e9cc 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
@@ -166,12 +167,46 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
             if (producerData != null) {
                 StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
                         producerData.getSimpleField("clusterName"));
+                consumer.setPePartitionInfo(readPePartitionData(producerData.getSimpleField("clusterName"), streamName));
                 consumers.add(consumer);
             }
         }
         streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
     }
 
+    /**
+     * Read PE partition data from Zookeeper. Remote Stream need these to do the partition.
+     * 
+     * @param clusterName
+     * @param streamName
+     * @return
+     */
+    private Map<String, PartitionData> readPePartitionData(String clusterName, String streamName) {
+        String path = "/s4/clusters/" + clusterName + "/app/s4App";
+        List<String> pePrototypes = zkClient.getChildren(path);
+        Map<String, PartitionData> results = Maps.newHashMap();
+        for (String prototypeSeq : pePrototypes) {
+            ZNRecord data = zkClient.readData(path + "/" + prototypeSeq, true);
+            if (data != null && data.getListField("streams").contains(streamName)) {
+                String prototypeId = data.getSimpleField("prototypeId");
+                if (!results.keySet().contains(prototypeSeq)) {
+                    results.put(
+                            prototypeId,
+                            new PartitionData(Boolean.parseBoolean(data.getSimpleField("isExclusive")), Integer
+                                    .parseInt(data.getSimpleField("partitionCount"))));
+                    Map<String, String> map = data.getMapField("globalPartitionMap");
+                    for (String key : map.keySet()) {
+                        results.get(prototypeId).addPartitionMappingInfo(Integer.parseInt(key),
+                                Integer.parseInt(map.get(key)));
+                    }
+                    results.get(prototypeId).setStreams(data.getListField("streams"));
+                }
+            }
+        }
+
+        return results;
+    }
+
     /*
      * (non-Javadoc)
      * 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index c7e02ea..c724ff6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -31,6 +31,8 @@ import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.core.util.S4Metrics;
@@ -77,9 +79,15 @@ public abstract class App {
     @Inject
     private Hasher hasher;
 
+    private ZkClient zkClient;
+
     @Inject
     private RemoteStreams remoteStreams;
 
+    public void setZkClient(ZkClient zkClient) {
+        this.zkClient = zkClient;
+    }
+
     @Inject
     private Cluster topology;
 
@@ -141,7 +149,6 @@ public abstract class App {
     }
 
     public ProcessingElement getPE(String name) {
-
         return peByName.get(name);
     }
 
@@ -194,7 +201,6 @@ public abstract class App {
 
         onInit();
 
-        schedule();
     }
 
     /**
@@ -235,6 +241,31 @@ public abstract class App {
 
     public void schedule() {
         schedule(topology);
+        writeToZK();
+    }
+
+    /**
+     * 
+     * To non-exclusive PEs, only need to write one node; To exclusive PEs,
+     * 
+     * @param pes
+     */
+    private void writeToZK() {
+        List<String> streamsOfNEPE = new ArrayList<String>();
+        ProcessingElement NEPeInstance = null;
+        for (int i = 0; i < getPePrototypes().size(); i++) {
+            ProcessingElement pe = getPePrototypes().get(i);
+            if (pe.isExclusive()) {
+                createZKNodeForPartition("prototype-" + i, pe, null);
+            } else {
+                streamsOfNEPE.addAll(pe.getInputStreams());
+                NEPeInstance = pe;
+            }
+        }
+
+        if (streamsOfNEPE.size() != 0) {
+            createZKNodeForPartition("Non-exclusivePEs", NEPeInstance, streamsOfNEPE);
+        }
     }
 
     /**
@@ -272,26 +303,54 @@ public abstract class App {
         // assign partition to exclusive PE
         for (ProcessingElement pe : exclusivePEList) {
             for (int i = 0; i < pe.getPartitionCount(); i++) {
-                pe.setGlobalPartitionId(i, partition++);
+                pe.addGlobalPartitionId(i, partition++);
             }
         }
 
         logger.info("Finished schedule !");
 
         showPartitionInfo();
+
     };
 
+    /**
+     * Create Zk node for saving partition information of PEs
+     * 
+     * @param id
+     * @param pe
+     * @param stream
+     */
+    private void createZKNodeForPartition(String id, ProcessingElement pe, List<String> streams) {
+        String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+        ZNRecord record = new ZNRecord(id);
+        record.putSimpleField("prototypeId", id);
+        record.putSimpleField("isExclusive", String.valueOf(pe.isExclusive()));
+        record.putSimpleField("partitionCount", String.valueOf(pe.getPartitionCount()));
+        if (streams == null) {
+            record.putListField("streams", pe.getInputStreams());
+        } else {
+            record.putListField("streams", streams);
+        }
+        Map<String, String> map = Maps.newHashMap();
+        for (int j = 0; j < pe.getPartitionCount(); j++) {
+            map.put(String.valueOf(j), String.valueOf(pe.getGlobalPartitionId(j)));
+        }
+        record.putMapField("globalPartitionMap", map);
+        zkClient.createEphemeralSequential(appPath + "/prototype_", record);
+        logger.debug("write partition info to zk: " + record);
+    }
+
     private void showPartitionInfo() {
         StringBuilder sb = new StringBuilder("\n");
-        String line = "---------------------------------------------------------------------------\n";
+        String line = "------------------------------------------------------------------------------------------\n";
         sb.append(line);
         sb.append("Partition Information\n");
         sb.append(line);
 
-        sb.append(String.format("%-20s%-20s%-15s%-10s%-20s%n", "PE Class", "PE Name", "Exclusive", "Count",
-                "Partition"));
+        sb.append(String
+                .format("%-25s%-25s%-15s%-10s%-25s%n", "PE Class", "PE Name", "Exclusive", "Count", "Partition"));
         for (ProcessingElement pe : getPePrototypes()) {
-            sb.append(String.format("%-20s%-20s%-15s%-10d", pe.getClass().getSimpleName(), pe.getName(),
+            sb.append(String.format("%-25s%-25s%-15s%-10d", pe.getClass().getSimpleName(), pe.getName(),
                     pe.isExclusive(), pe.getPartitionCount()));
             StringBuilder partition = new StringBuilder();
             if (pe.isExclusive()) {
@@ -300,7 +359,7 @@ public abstract class App {
             } else {
                 partition.append("[0 ~ " + (pe.getPartitionCount() - 1) + "]");
             }
-            sb.append(String.format("%-20s%n", partition.toString()));
+            sb.append(String.format("%-25s%n", partition.toString()));
         }
         sb.append(line);
         logger.debug(sb.toString());

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index 6aaa8f1..e1cd90e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -85,6 +85,7 @@ public class DefaultRemoteSenders implements RemoteSenders {
             if (sender == null) {
                 RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
                         .getClusterName())), hasher, consumer.getClusterName());
+                newSender.setPartitionDatas(consumer.getPePartitionInfo());
                 // TODO cleanup when remote topologies die
                 sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
                 if (sender == null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index e08f468..09ddd0d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import net.jcip.annotations.ThreadSafe;
 
 import org.apache.s4.base.Event;
+import org.apache.s4.comm.topology.PartitionData;
 import org.apache.s4.core.ft.CheckpointId;
 import org.apache.s4.core.ft.CheckpointingConfig;
 import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
@@ -147,10 +149,7 @@ public abstract class ProcessingElement implements Cloneable {
 
     transient private Timer processingTimer;
 
-    transient private boolean isExclusive = false;
-    transient private int partitionCount = -1;
-    /* This map holds the mapping of PE partition id and glocal partition id */
-    transient private Map<Integer, Integer> globalPartitionMap;
+    transient private PartitionData partitionData;
 
     transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
             .build();
@@ -171,7 +170,8 @@ public abstract class ProcessingElement implements Cloneable {
         });
         triggers = new MapMaker().makeMap();
 
-        globalPartitionMap = new MapMaker().makeMap();
+        partitionData = new PartitionData();
+
         /*
          * Only the PE Prototype uses the constructor. The PEPrototype field will be cloned by the instances and point
          * to the prototype.
@@ -180,12 +180,20 @@ public abstract class ProcessingElement implements Cloneable {
 
     }
 
-    public void setGlobalPartitionId(int partitionId, int nodeId) {
-        globalPartitionMap.put(partitionId, nodeId);
+    public void addGlobalPartitionId(int partitionId, int nodeId) {
+        partitionData.addPartitionMappingInfo(partitionId, nodeId);
     }
 
     public int getGlobalPartitionId(int partitionId) {
-        return globalPartitionMap.get(partitionId);
+        return partitionData.getGlobalePartitionId(partitionId);
+    }
+
+    public void addInputStream(String stream) {
+        partitionData.addStream(stream);
+    }
+
+    public List<String> getInputStreams() {
+        return partitionData.getStreams();
     }
 
     /**
@@ -503,7 +511,7 @@ public abstract class ProcessingElement implements Cloneable {
     }
 
     public boolean isExclusive() {
-        return isExclusive;
+        return partitionData.isExclusive();
     }
 
     /**
@@ -513,16 +521,20 @@ public abstract class ProcessingElement implements Cloneable {
      * @param partitionCount
      */
     public void setExclusive(int partitionCount) {
-        this.isExclusive = true;
-        this.partitionCount = partitionCount;
+        this.partitionData.setExclusive(true);
+        this.partitionData.setPartitionCount(partitionCount);
     }
 
     public void setPartitionCount(int partitionCount) {
-        this.partitionCount = partitionCount;
+
+        this.partitionData.setPartitionCount(partitionCount);
+        for (int i = 0; i < partitionCount; i++) {
+            this.partitionData.addPartitionMappingInfo(i, i);
+        }
     }
 
     public int getPartitionCount() {
-        return partitionCount;
+        return partitionData.getPartitionCount();
     }
 
     private boolean isTrigger(Event event) {
@@ -946,4 +958,5 @@ public abstract class ProcessingElement implements Cloneable {
         return sb.toString();
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index dba8b6c..6cf98e8 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -19,10 +19,17 @@
 package org.apache.s4.core;
 
 import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
+import org.apache.s4.comm.topology.PartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Sends events to a remote cluster.
@@ -30,27 +37,40 @@ import org.apache.s4.base.Hasher;
  */
 public class RemoteSender {
 
+    private static final Logger logger = LoggerFactory.getLogger(RemoteSender.class);
+
     final private Emitter emitter;
     final private Hasher hasher;
     AtomicInteger targetPartition = new AtomicInteger();
     final private String remoteClusterName;
+    private Map<String, PartitionData> partitionDatas;
 
     public RemoteSender(Emitter emitter, Hasher hasher, String clusterName) {
         super();
         this.emitter = emitter;
         this.hasher = hasher;
         this.remoteClusterName = clusterName;
+    }
 
+    public void setPartitionDatas(Map<String, PartitionData> partitionDatas) {
+        this.partitionDatas = partitionDatas;
     }
 
     public void send(String hashKey, ByteBuffer message) throws InterruptedException {
-        int partition;
-        if (hashKey == null) {
-            // round robin by default
-            partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount());
-        } else {
-            partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+        
+        Set<Integer> partitions = new HashSet<Integer>();
+
+        logger.warn("Remote sending with hash: " + hashKey);
+        int hashValue = (hashKey == null) ? targetPartition.incrementAndGet() : (int) hasher.hash(hashKey);
+
+        for (String prototype : partitionDatas.keySet()) {
+            PartitionData data = partitionDatas.get(prototype);
+            partitions.add(data.getGlobalePartitionId(hashValue % data.getPartitionCount()));
+        }
+
+        for (Integer partition : partitions) {
+            logger.warn("Remote sending to partition: " + partition);
+            emitter.send(partition, message);
         }
-        emitter.send(partition, message);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 757ce4f..64bc031 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -134,7 +134,9 @@ public class S4Bootstrap {
             // use correct classLoader for running the app initialization
             Thread.currentThread().setContextClassLoader(app.getClass().getClassLoader());
 
+            app.setZkClient(zkClient);
             app.init();
+            app.schedule();
             app.start();
 
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 2cf55f7..c2f0805 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -154,6 +154,9 @@ public class Stream<T extends Event> implements Streamable {
      */
     public Stream<T> setPEs(ProcessingElement... pes) {
         this.targetPEs = pes;
+        for(ProcessingElement pe: pes){
+            pe.addInputStream(name);
+        }
         return this;
     }
 
@@ -263,6 +266,7 @@ public class Stream<T extends Event> implements Streamable {
         return targetPEs;
     }
 
+
     /**
      * Stop and close this stream.
      */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
index cc404af..0f91b0c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
@@ -20,7 +20,7 @@ public class DeploymentUtils {
 
         if (zk.exists("/s4/clusters/" + clusterName + "/app/s4App")) {
             if (deleteIfExists) {
-                zk.delete("/s4/clusters/" + clusterName + "/app/s4App");
+                zk.deleteRecursive("/s4/clusters/" + clusterName + "/app/s4App");
             }
         }
         try {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
new file mode 100644
index 0000000..91f0a85
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
@@ -0,0 +1,50 @@
+package org.apache.s4.core.ri;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteStream;
+import org.apache.s4.wordcount.SentenceKeyFinder;
+import org.apache.s4.wordcount.WordCountTest;
+
+public class RemoteAdapterApp extends App {
+
+    String outputStreamName;
+    private RemoteStream remoteStream;
+
+    @Override
+    protected void onInit() {
+        remoteStream = createOutputStream("inputStream", new SentenceKeyFinder());
+    }
+
+    protected KeyFinder<Event> remoteStreamKeyFinder;
+
+    protected void setKeyFinder(KeyFinder<Event> keyFinder) {
+        this.remoteStreamKeyFinder = keyFinder;
+    }
+
+    @Override
+    protected void onStart() {
+        injectSentence(WordCountTest.SENTENCE_1);
+        injectSentence(WordCountTest.SENTENCE_2);
+        injectSentence(WordCountTest.SENTENCE_3);
+
+    }
+
+    public void injectSentence(String sentence) {
+        Event event = new Event();
+        event.setStreamId("inputStream");
+        event.put("sentence", String.class, sentence);
+        getRemoteStream().put(event);
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public RemoteStream getRemoteStream() {
+        return remoteStream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
new file mode 100644
index 0000000..0757f5e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
@@ -0,0 +1,119 @@
+package org.apache.s4.core.ri;
+
+import static org.apache.s4.core.ri.RuntimeIsolationTest.counterNumber;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.wordcount.WordCountModule;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+public class RemoteStreamRITest extends RuntimeIsolationTest {
+
+    private static Logger logger = LoggerFactory.getLogger(RemoteStreamRITest.class);
+
+    @Override
+    public void injectData() throws InterruptedException, IOException {
+        // Use remote stream
+
+    }
+
+    @Override
+    public void startNodes() throws IOException, InterruptedException {
+        final ZooKeeper zk = CommTestUtils.createZkClient();
+
+        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+        taskSetup.setup("cluster2", 1, 1500);
+
+        s4nodes = new Process[numberTasks + 1];
+
+        List<Process> nodes = new ArrayList<Process>();
+
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(IsolationWordCountApp.class.getName())
+                .customModulesNames(ImmutableList.of(WordCountModule.class.getName())).build(), "cluster1", false,
+                "localhost:2181");
+        nodes.addAll(Arrays.asList(CoreTestUtils.forkS4Nodes(new String[] { "-c", "cluster1" }, new ZkClient(
+                "localhost:2181"), 10, "cluster1", numberTasks)));
+
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(RemoteAdapterApp.class.getName())
+                .customModulesNames(ImmutableList.of(WordCountModule.class.getName())).build(), "cluster2", false,
+                "localhost:2181");
+
+        nodes.addAll(Arrays.asList(CoreTestUtils.forkS4Nodes(new String[] { "-c", "cluster2" }, new ZkClient(
+                "localhost:2181"), 10, "cluster2", 1)));
+
+        s4nodes = nodes.toArray(new Process[] {});
+        
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        try {
+            CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
+            // add authorizations for processing
+            for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+                zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            }
+
+//            injectData();
+
+            Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
+            String results = new String(zk.getData("/results", false, null));
+            Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
+
+            List<String> counterInstances = zk.getChildren("/counters", false);
+
+            int totalCount = 0;
+            int activeInstances = 0;
+            for (String instance : counterInstances) {
+                int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
+                if (count != 0) {
+                    activeInstances++;
+                }
+                totalCount += count;
+            }
+            Assert.assertEquals(numberTasks, counterInstances.size());
+            Assert.assertEquals(counterNumber, activeInstances);
+
+            Assert.assertEquals(13, totalCount);
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        }
+        
+    }
+
+    @Override
+    @Test
+    public void testSimple() {
+        ZooKeeper zk;
+        try {
+            zk = CommTestUtils.createZkClient();
+            zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            startNodes();
+
+        } catch (IOException e) {
+            logger.error("", e);
+        } catch (KeeperException e) {
+            logger.error("", e);
+        } catch (InterruptedException e) {
+            logger.error("", e);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
index 6a2549d..0b9b064 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
@@ -19,29 +19,25 @@
 package org.apache.s4.core.ri;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.Assert;
-
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.App;
-import org.apache.s4.core.S4Node;
 import org.apache.s4.core.Stream;
-import org.apache.s4.core.ft.FTWordCountApp;
-import org.apache.s4.core.ft.FileSystemBackendCheckpointingModule;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.deploy.DeploymentUtils;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.wordcount.IsolationWordCounterPE;
 import org.apache.s4.wordcount.SentenceKeyFinder;
 import org.apache.s4.wordcount.WordClassifierPE;
 import org.apache.s4.wordcount.WordCountEvent;
 import org.apache.s4.wordcount.WordCountKeyFinder;
 import org.apache.s4.wordcount.WordCountModule;
 import org.apache.s4.wordcount.WordCountTest;
-import org.apache.s4.wordcount.WordCounterPE;
 import org.apache.s4.wordcount.WordSeenEvent;
 import org.apache.s4.wordcount.WordSeenKeyFinder;
 import org.apache.s4.wordcount.WordSplitterPE;
@@ -49,21 +45,26 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 
 public class RuntimeIsolationTest extends WordCountTest {
+    private static Logger logger = LoggerFactory.getLogger(RuntimeIsolationTest.class);
 
     final static int numberTasks = 5;
-    private Process[] s4nodes;
+    protected Process[] s4nodes;
+
+    protected static int counterNumber = 2;
 
     public RuntimeIsolationTest() {
         super(numberTasks);
     }
-    
+
     @After
     public void cleanup() throws IOException, InterruptedException {
         if (s4nodes == null) {
@@ -75,46 +76,66 @@ public class RuntimeIsolationTest extends WordCountTest {
         }
     }
 
-    /**
-     * reuse {@link WordCountTest}. Start 3 nodes.
-     * 
-     */
     @Test
-    @Override
-    public void testSimple() throws Exception {
-        final ZooKeeper zk = CommTestUtils.createZkClient();
+    public void testSimple() {
+        try {
+            final ZooKeeper zk = CommTestUtils.createZkClient();
+
+            zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            // start nodes
+            startNodes();
+
+            // we create the emitter now, it will share zk node assignment with the S4 node
+            createEmitter();
+
+            CountDownLatch signalTextProcessed = new CountDownLatch(1);
+            CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
 
-        // start 3 nodes
-        startNodes("cluster1", IsolationWordCountApp.class.getName(), numberTasks);
+            // add authorizations for processing
+            for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+                zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            }
 
-        // we create the emitter now, it will share zk node assignment with the S4 node
-        createEmitter();
+            injectData();
 
-        CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
+            Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
+            String results = new String(zk.getData("/results", false, null));
+            Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
 
-        // add authorizations for processing
-        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
-            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            List<String> counterInstances = zk.getChildren("/counters", false);
+
+            int totalCount = 0;
+            int activeInstances = 0;
+            for (String instance : counterInstances) {
+                int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
+                if (count != 0) {
+                    activeInstances++;
+                }
+                totalCount += count;
+            }
+            Assert.assertEquals(numberTasks, counterInstances.size());
+            Assert.assertEquals(counterNumber, activeInstances);
+
+            Assert.assertEquals(13, totalCount);
+        } catch (Exception e) {
+            logger.error("ERROR!", e);
         }
+    }
+
+    public void injectData() throws IOException, InterruptedException {
         injectSentence(SENTENCE_1);
         injectSentence(SENTENCE_2);
         injectSentence(SENTENCE_3);
-        Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
-        String results = new String(zk.getData("/results", false, null));
-        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
-
     }
 
-    public void startNodes(String clusterName, String appClass, int number) throws InterruptedException, IOException {
-        s4nodes = new Process[number];
-        DeploymentUtils.initAppConfig(
-                new AppConfig.Builder()
-                        .appClassName(appClass)
-                        .customModulesNames(ImmutableList.of(WordCountModule.class.getName()))
-                        .build(), "cluster1", false, "localhost:2181");
+    public void startNodes() throws InterruptedException, IOException {
+        s4nodes = new Process[numberTasks];
+        DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(IsolationWordCountApp.class.getName())
+                .customModulesNames(ImmutableList.of(WordCountModule.class.getName())).build(), "cluster1", false,
+                "localhost:2181");
         s4nodes = CoreTestUtils.forkS4Nodes(new String[] { "-c", "cluster1" }, new ZkClient("localhost:2181"), 10,
-                "cluster1", number);
+                "cluster1", numberTasks);
     }
 
     static class IsolationWordCountApp extends App {
@@ -135,12 +156,12 @@ public class RuntimeIsolationTest extends WordCountTest {
 
             WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class, "classifierPE");
             // set WordClassifierPE has 2 exclusive partitions
-            wordClassifierPrototype.setExclusive(2);
+//            wordClassifierPrototype.setExclusive(2);
 
             Stream<WordCountEvent> wordCountStream = createStream("words counts stream", new WordCountKeyFinder(),
                     wordClassifierPrototype);
-            WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class, "counterPE");
-            wordCounterPrototype.setExclusive(2);
+            IsolationWordCounterPE wordCounterPrototype = createPE(IsolationWordCounterPE.class, "counterPE");
+            wordCounterPrototype.setExclusive(counterNumber);
 
             wordCounterPrototype.setWordClassifierStream(wordCountStream);
             Stream<WordSeenEvent> wordSeenStream = createStream("words seen stream", new WordSeenKeyFinder(),
@@ -149,9 +170,7 @@ public class RuntimeIsolationTest extends WordCountTest {
             wordSplitterPrototype.setWordSeenStream(wordSeenStream);
             Stream<Event> sentenceStream = createInputStream("inputStream", new SentenceKeyFinder(),
                     wordSplitterPrototype);
-            
-            //TestPE
-            WordSplitterPE wordSplitterTestPE = createPE(WordSplitterPE.class,"TestPE");
+
         }
 
         @Override
@@ -160,4 +179,5 @@ public class RuntimeIsolationTest extends WordCountTest {
         }
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
new file mode 100644
index 0000000..fc8a891
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
@@ -0,0 +1,91 @@
+package org.apache.s4.wordcount;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IsolationWordCounterPE extends ProcessingElement implements Watcher {
+    private static Logger logger = LoggerFactory.getLogger(IsolationWordCounterPE.class);
+
+    transient private ZooKeeper zk;
+    private String zkPath;
+    static int count = 0;
+    int wordCounter;
+    transient Stream<WordCountEvent> wordClassifierStream;
+    public static AtomicInteger prototypeId = new AtomicInteger();
+
+    private IsolationWordCounterPE() {
+
+    }
+
+    public IsolationWordCounterPE(App app) {
+        super(app);
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:2181", 4000, this);
+                synchronized (prototypeId) {
+                    zkPath = "/counters/counter_prototype_" + prototypeId.incrementAndGet() + "_"
+                            + System.currentTimeMillis();
+                    zk.create(zkPath, "0".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void onEvent(WordSeenEvent event) {
+
+        wordCounter++;
+        System.out.println("seen word " + event.getWord());
+
+        // NOTE: it seems the id is the key for now...
+        wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+        // Update the zookeeper
+        synchronized (this) {
+            count++;
+            try {
+                zk.setData(zkPath, String.valueOf(count).getBytes(), -1);
+                logger.info("set " + zkPath + " " + count);
+            } catch (KeeperException e) {
+                logger.error(zkPath + " " + count, e);
+            } catch (InterruptedException e) {
+                logger.error(zkPath + " " + count, e);
+            }
+        }
+    }
+
+    public void setWordClassifierStream(Stream<WordCountEvent> stream) {
+        this.wordClassifierStream = stream;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}