You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/05/24 18:19:26 UTC

[4/5] inter-app communications + refactorings

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
new file mode 100644
index 0000000..6b5d094
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -0,0 +1,211 @@
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Monitors streams available in the S4 cluster.
+ * </p>
+ * <p>
+ * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
+ * </p>
+ * <p>
+ * Provides methods to publish producers and consumers of streams
+ * </p>
+ * 
+ */
+@Singleton
+public class RemoteStreams implements IZkStateListener, IZkChildListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+    private KeeperState state;
+    private final ZkClient zkClient;
+    private final Lock lock;
+    private final static String STREAMS_PATH = "/s4/streams";
+    // by stream name, then "producer"|"consumer" then
+    private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+
+    public enum StreamType {
+        PRODUCER, CONSUMER;
+
+        public String getPath(String streamName) {
+            switch (this) {
+                case PRODUCER:
+                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+                case CONSUMER:
+                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+                default:
+                    throw new RuntimeException("Invalid path in enum StreamType");
+            }
+        }
+
+        public String getCollectionName() {
+            switch (this) {
+                case PRODUCER:
+                    return "producers";
+                case CONSUMER:
+                    return "consumers";
+                default:
+                    throw new RuntimeException("Invalid path in enum StreamType");
+            }
+        }
+    }
+
+    @Inject
+    public RemoteStreams(@Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+
+        lock = new ReentrantLock();
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+        zkClient.subscribeStateChanges(this);
+        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+        // bug in zkClient, it does not invoke handleNewSession the first time
+        // it connects
+        this.handleStateChanged(KeeperState.SyncConnected);
+
+        this.handleNewSession();
+
+    }
+
+    public Set<StreamConsumer> getConsumers(String streamName) {
+        if (!streams.containsKey(streamName)) {
+            return Collections.emptySet();
+        } else {
+            return streams.get(streamName).get("consumers");
+        }
+    }
+
+    /**
+     * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
+     */
+    private void doProcess() {
+        lock.lock();
+        try {
+            refreshStreams();
+        } catch (Exception e) {
+            logger.warn("Exception in tryToAcquireTask", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        this.state = state;
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+        logger.info("New session:" + zkClient.getSessionId());
+        zkClient.subscribeChildChanges(STREAMS_PATH, this);
+
+        doProcess();
+    }
+
+    @Override
+    public void handleChildChange(String paramString, List<String> paramList) throws Exception {
+        doProcess();
+    }
+
+    private void refreshStreams() {
+        List<String> children = zkClient.getChildren(STREAMS_PATH);
+        for (String streamName : children) {
+            if (!streams.containsKey(streamName)) {
+                logger.info("Detected new stream [{}]", streamName);
+                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+                zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
+                zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
+                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+            }
+
+            update(streamName, StreamType.PRODUCER);
+            update(streamName, StreamType.CONSUMER);
+        }
+    }
+
+    private void update(String streamName, StreamType type) {
+        List<String> elements = zkClient.getChildren(type.getPath(streamName));
+        Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
+        for (String element : elements) {
+            ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
+            if (producerData != null) {
+                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
+                        producerData.getSimpleField("clusterName"));
+                consumers.add(consumer);
+            }
+        }
+        streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
+    }
+
+    public void addOutputStream(String appId, String clusterName, String streamName) {
+        lock.lock();
+        try {
+            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
+                    clusterName });
+            createStreamPaths(streamName);
+            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+            producer.putSimpleField("appId", appId);
+            producer.putSimpleField("clusterName", clusterName);
+            try {
+                zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
+            } catch (Throwable e) {
+                logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+                        new String[] { streamName, appId, clusterName, e.getMessage() });
+            }
+            refreshStreams();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
+     */
+    private void createStreamPaths(String streamName) {
+        zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
+        zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
+    }
+
+    public void addInputStream(int appId, String clusterName, String streamName) {
+        lock.lock();
+        try {
+            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
+                    new String[] { streamName, String.valueOf(appId), clusterName });
+            createStreamPaths(streamName);
+            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+            consumer.putSimpleField("appId", String.valueOf(appId));
+            consumer.putSimpleField("clusterName", clusterName);
+            try {
+                zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
+            } catch (Throwable e) {
+                logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+            }
+            refreshStreams();
+        } finally {
+            lock.unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
deleted file mode 100644
index 8f7cc1a..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.s4.comm.topology;
-
-public interface RemoteTopology extends Topology {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
deleted file mode 100644
index 39ede90..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-public class RemoteTopologyFromZK extends TopologyFromZK implements RemoteTopology {
-
-    @Inject
-    public RemoteTopologyFromZK(@Named("cluster.remote.name") String remoteClusterName,
-            @Named("cluster.zk_address") String zookeeperAddress,
-            @Named("cluster.zk_session_timeout") int sessionTimeout,
-            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(remoteClusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
new file mode 100644
index 0000000..f21213a
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -0,0 +1,27 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * A subscriber to a published stream. Identified through its cluster name (for dispatching to the remote cluster) and
+ * application ID (for dispatching within a node).
+ * 
+ */
+public class StreamConsumer {
+
+    int appId;
+    String clusterName;
+
+    public StreamConsumer(int appId, String clusterName) {
+        super();
+        this.appId = appId;
+        this.clusterName = clusterName;
+    }
+
+    public int getAppId() {
+        return appId;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java
deleted file mode 100644
index 584f3da..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import org.apache.s4.comm.topology.Cluster;
-
-public interface Topology {
-    public Cluster getTopology();
-    public void addListener(TopologyChangeListener listener);
-    public void removeListener(TopologyChangeListener listener);
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java
deleted file mode 100644
index d1ac45b..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.s4.comm.topology;
-
-public interface TopologyChangeListener {
-    public void onChange();
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java
deleted file mode 100644
index b28a170..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import com.google.inject.Inject;
-
-public class TopologyFromFile implements Topology {
-
-    private Cluster cluster;
-
-    @Inject
-    public TopologyFromFile(Cluster cluster) {
-        super();
-        this.cluster = cluster;
-
-    }
-
-    @Override
-    public Cluster getTopology() {
-        return cluster;
-    }
-
-    @Override
-    public void addListener(TopologyChangeListener listener) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void removeListener(TopologyChangeListener listener) {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
deleted file mode 100644
index 0be6686..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-public class TopologyFromZK implements Topology, IZkChildListener, IZkStateListener, IZkDataListener {
-    private static final Logger logger = LoggerFactory.getLogger(TopologyFromZK.class);
-    private final String clusterName;
-    private final AtomicReference<Cluster> clusterRef;
-    private final List<TopologyChangeListener> listeners;
-    private KeeperState state;
-    private final ZkClient zkClient;
-    private final String taskPath;
-    private final String processPath;
-    private final Lock lock;
-    private AtomicBoolean currentlyOwningTask;
-    private String machineId;
-
-    @Inject
-    public TopologyFromZK(@Named("cluster.name") String clusterName,
-            @Named("cluster.zk_address") String zookeeperAddress,
-            @Named("cluster.zk_session_timeout") int sessionTimeout,
-            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        this.clusterName = clusterName;
-        taskPath = "/" + clusterName + "/" + "tasks";
-        processPath = "/" + clusterName + "/" + "process";
-        lock = new ReentrantLock();
-        clusterRef = new AtomicReference<Cluster>();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
-        listeners = new ArrayList<TopologyChangeListener>();
-        zkClient.subscribeStateChanges(this);
-        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
-        try {
-            machineId = InetAddress.getLocalHost().getCanonicalHostName();
-        } catch (UnknownHostException e) {
-            logger.warn("Unable to get hostname", e);
-            machineId = "UNKNOWN";
-        }
-        // bug in zkClient, it does not invoke handleNewSession the first time
-        // it connects
-        this.handleStateChanged(KeeperState.SyncConnected);
-        this.handleNewSession();
-    }
-
-    @Override
-    public Cluster getTopology() {
-        return clusterRef.get();
-    }
-
-    @Override
-    public void addListener(TopologyChangeListener listener) {
-        logger.info("Adding topology change listener:" + listener);
-        listeners.add(listener);
-    }
-
-    @Override
-    public void removeListener(TopologyChangeListener listener) {
-        logger.info("Removing topology change listener:" + listener);
-        listeners.remove(listener);
-    }
-
-    @Override
-    public void handleDataChange(String dataPath, Object data) throws Exception {
-        doProcess();
-    }
-
-    @Override
-    public void handleDataDeleted(String dataPath) throws Exception {
-        doProcess();
-    }
-
-    @Override
-    public void handleStateChanged(KeeperState state) throws Exception {
-        this.state = state;
-    }
-
-    @Override
-    public void handleNewSession() throws Exception {
-        logger.info("New session:" + zkClient.getSessionId());
-        zkClient.subscribeChildChanges(taskPath, this);
-        zkClient.subscribeChildChanges(processPath, this);
-        doProcess();
-    }
-
-    @Override
-    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-        doProcess();
-    }
-
-    private void doProcess() {
-        lock.lock();
-        try {
-            refreshTopology();
-        } catch (Exception e) {
-            logger.error("", e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void refreshTopology() throws Exception {
-        List<String> processes = zkClient.getChildren(processPath);
-        List<String> tasks = zkClient.getChildren(taskPath);
-        Cluster cluster = new Cluster(tasks.size());
-        for (int i = 0; i < processes.size(); i++) {
-            ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
-            if (process != null) {
-                int partition = Integer.parseInt(process.getSimpleField("partition"));
-                String host = process.getSimpleField("host");
-                int port = Integer.parseInt(process.getSimpleField("port"));
-                String taskId = process.getSimpleField("taskId");
-                ClusterNode node = new ClusterNode(partition, port, host, taskId);
-                cluster.addNode(node);
-            }
-        }
-        logger.info("Changing cluster topology to " + cluster + " from " + clusterRef.get());
-        clusterRef.set(cluster);
-        // Notify all changeListeners about the topology change
-        for (TopologyChangeListener listener : listeners) {
-            listener.onChange();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
index decce25..4aa7639 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
@@ -4,6 +4,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+/**
+ * ZooKeeper's custom znode data structure. Allows for easily information addition and retrieval.
+ * 
+ */
 public class ZNRecord {
 
     String id;
@@ -16,7 +20,7 @@ public class ZNRecord {
     Map<String, List<String>> listFields;
     Map<String, Map<String, String>> mapFields;
 
-    public ZNRecord() {
+    private ZNRecord() {
 
     }
 
@@ -59,6 +63,17 @@ public class ZNRecord {
     }
 
     @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((id == null) ? 0 : id.hashCode());
+        result = prime * result + ((listFields == null) ? 0 : listFields.hashCode());
+        result = prime * result + ((mapFields == null) ? 0 : mapFields.hashCode());
+        result = prime * result + ((simpleFields == null) ? 0 : simpleFields.hashCode());
+        return result;
+    }
+
+    @Override
     public boolean equals(Object obj) {
         if (obj == null) {
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 9882241..8fab1a6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -12,18 +12,18 @@ import org.apache.s4.base.Emitter;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
 
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
 
-public class UDPEmitter implements Emitter, TopologyChangeListener {
+public class UDPEmitter implements Emitter, ClusterChangeListener {
     private DatagramSocket socket;
     private final HashBiMap<Integer, ClusterNode> nodes;
     private final Map<Integer, InetAddress> inetCache = new HashMap<Integer, InetAddress>();
     private final long messageDropInQueueCount = 0;
-    private final Topology topology;
+    private final Cluster topology;
 
     @Inject
     SerializerDeserializer serDeser;
@@ -33,11 +33,11 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
     }
 
     @Inject
-    public UDPEmitter(Topology topology) {
+    public UDPEmitter(Cluster topology) {
         this.topology = topology;
         topology.addListener(this);
-        nodes = HashBiMap.create(topology.getTopology().getNodes().size());
-        for (ClusterNode node : topology.getTopology().getNodes()) {
+        nodes = HashBiMap.create(topology.getPhysicalCluster().getNodes().size());
+        for (ClusterNode node : topology.getPhysicalCluster().getNodes()) {
             nodes.forcePut(node.getPartition(), node);
         }
 
@@ -74,14 +74,14 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
 
     @Override
     public int getPartitionCount() {
-        return topology.getTopology().getPartitionCount();
+        return topology.getPhysicalCluster().getPartitionCount();
     }
 
     @Override
     public void onChange() {
         // topology changes when processes pick tasks
         synchronized (nodes) {
-            for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+            for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
                 Integer partition = clusterNode.getPartition();
                 nodes.put(partition, clusterNode);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
index e759f25..abbc31b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
@@ -64,32 +64,29 @@ public class DeliveryTestUtil {
 
     /*
      * TimerThread - interrupts the passed thread, after specified time-interval.
+     * 
+     * FIXME we should use number of events rather than time-based interval
      */
     class TimerThread extends Thread {
         private final Thread watchThread;
-        private Integer sleepCounter;
+        private volatile int sleepCounter;
 
         TimerThread(Thread watchThread) {
             this.watchThread = watchThread;
-            this.sleepCounter = new Integer(sleepCount);
+            this.sleepCounter = sleepCount;
         }
 
         public void resetSleepCounter() {
-            synchronized (this.sleepCounter) {
-                this.sleepCounter = sleepCount;
-            }
+            this.sleepCounter = sleepCount;
         }
 
         public void clearSleepCounter() {
-            synchronized (this.sleepCounter) {
-                this.sleepCounter = 0;
-            }
+            this.sleepCounter = 0;
         }
 
         private int getCounter() {
-            synchronized (this.sleepCounter) {
-                return this.sleepCounter--;
-            }
+            return sleepCounter--;
+
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index f02bf64..cb39c1b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -1,9 +1,8 @@
 package org.apache.s4.comm.tcp;
 
 import java.io.IOException;
+
 import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
 import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.KeeperException;
@@ -26,7 +25,7 @@ public class TCPCommTest extends ZkBasedTest {
 
     class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
         TCPCommTestModule() {
-            super(TCPEmitter.class, TCPListener.class);
+            super(TCPEmitter.class, TCPRemoteEmitter.class, TCPListener.class);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
deleted file mode 100644
index e709d68..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.junit.Test;
-
-public class AssignmentFromZKTest extends ZKBaseTest {
-
-    @Test
-    public void testAssignment() throws Exception {
-        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
-        final String clusterName = "test-s4-cluster";
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10, 1300);
-        final CountDownLatch latch = new CountDownLatch(10);
-        for (int i = 0; i < 10; i++) {
-            Runnable runnable = new Runnable() {
-
-                @SuppressWarnings("unused")
-                @Override
-                public void run() {
-                    AssignmentFromZK assignmentFromZK;
-                    try {
-                        assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
-                        ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            };
-            Thread t = new Thread(runnable);
-            t.start();
-        }
-
-        boolean await = latch.await(3, TimeUnit.SECONDS);
-        assertEquals(true, await);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
new file mode 100644
index 0000000..71a3489
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
@@ -0,0 +1,69 @@
+package org.apache.s4.comm.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+
+public class AssignmentsFromZKTest extends ZKBaseTest {
+
+    @Test
+    @Ignore
+    public void testAssignmentFor1Cluster() throws Exception {
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String topologyNames = "cluster1";
+        testAssignment(taskSetup, topologyNames);
+    }
+
+    @Test
+    public void testAssignmentFor2Clusters() throws Exception {
+        Thread.sleep(2000);
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String topologyNames = "cluster2, cluster3";
+        testAssignment(taskSetup, topologyNames);
+    }
+
+    private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+        final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
+        taskSetup.clean("s4");
+        for (String topologyName : names) {
+            taskSetup.setup(topologyName, 10, 1300);
+        }
+
+        final CountDownLatch latch = new CountDownLatch(10 * names.size());
+        for (int i = 0; i < 10; i++) {
+            Runnable runnable = new Runnable() {
+
+                @SuppressWarnings("unused")
+                @Override
+                public void run() {
+                    AssignmentFromZK assignmentFromZK;
+                    try {
+
+                        for (String topologyName : names) {
+                            assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+                            latch.countDown();
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        boolean await = latch.await(30, TimeUnit.SECONDS);
+        assertEquals(true, await);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
new file mode 100644
index 0000000..38d7621
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -0,0 +1,97 @@
+package org.apache.s4.comm.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+
+public class ClustersFromZKTest extends ZKBaseTest {
+
+    @Test
+    @Ignore
+    public void testAssignmentFor1Topology() throws InterruptedException, Exception {
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String clustersString = "cluster1";
+        testAssignment(taskSetup, clustersString);
+    }
+
+    @Test
+    public void testAssignmentFor2Topologies() throws Exception {
+        Thread.sleep(2000);
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String clustersString = "cluster2, cluster3";
+        testAssignment(taskSetup, clustersString);
+
+    }
+
+    private void testAssignment(TaskSetup taskSetup, final String clustersString) throws Exception,
+            InterruptedException {
+        final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
+        taskSetup.clean("s4");
+        for (String clusterName : clusterNames) {
+            taskSetup.setup(clusterName, 10, 1300);
+        }
+
+        final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+
+        final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
+        for (final String clusterName : clusterNames) {
+            ClusterChangeListener listener = new ClusterChangeListener() {
+
+                @Override
+                public void onChange() {
+                    if (clusterFromZK.getCluster(clusterName).getPhysicalCluster().getNodes().size() == 10) {
+                        signalAllClustersComplete.countDown();
+                    }
+
+                }
+            };
+            clusterFromZK.getCluster(clusterName).addListener(listener);
+        }
+
+        final CountDownLatch latch = new CountDownLatch(10 * clusterNames.size());
+        for (int i = 0; i < 10; i++) {
+            Runnable runnable = new Runnable() {
+
+                @SuppressWarnings("unused")
+                @Override
+                public void run() {
+                    AssignmentFromZK assignmentFromZK;
+                    try {
+                        for (String clusterName : clusterNames) {
+                            assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+                            latch.countDown();
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Thread t = new Thread(runnable);
+            t.start();
+        }
+
+        boolean await = latch.await(20, TimeUnit.SECONDS);
+        assertEquals(true, await);
+        boolean success = false;
+        success = signalAllClustersComplete.await(20, TimeUnit.SECONDS);
+        assertEquals(true, success);
+        for (String clusterName : clusterNames) {
+            if (!(10 == clusterFromZK.getCluster(clusterName).getPhysicalCluster().getNodes().size())) {
+                // pending zookeeper updates are not yet reflected
+                Thread.sleep(2000);
+            }
+            assertEquals(10, clusterFromZK.getCluster(clusterName).getPhysicalCluster().getNodes().size());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
deleted file mode 100644
index 65eee28..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.junit.Test;
-
-public class TopologyFromZKTest extends ZKBaseTest {
-
-    @Test
-    public void testAssignment() throws Exception {
-        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
-        final String clusterName = "test-s4-cluster";
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10, 1300);
-
-        final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
-        final Lock lock = new ReentrantLock();
-        final Condition signal = lock.newCondition();
-        TopologyChangeListener listener = new TopologyChangeListener() {
-
-            @Override
-            public void onChange() {
-                System.out.println("TopologyFromZKTest.testAssignment().new TopologyChangeListener() {...}.onChange()");
-                if (topologyFromZK.getTopology().getNodes().size() == 10) {
-                    lock.lock();
-                    try {
-                        signal.signalAll();
-                    } finally {
-                        lock.unlock();
-                    }
-
-                }
-
-            }
-        };
-        topologyFromZK.addListener(listener);
-        final CountDownLatch latch = new CountDownLatch(10);
-        for (int i = 0; i < 10; i++) {
-            Runnable runnable = new Runnable() {
-
-                @SuppressWarnings("unused")
-                @Override
-                public void run() {
-                    AssignmentFromZK assignmentFromZK;
-                    try {
-                        assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
-                        ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            };
-            Thread t = new Thread(runnable);
-            t.start();
-        }
-
-        boolean await = latch.await(3, TimeUnit.SECONDS);
-        assertEquals(true, await);
-        boolean success = false;
-        lock.lock();
-        try {
-            success = signal.await(3, TimeUnit.SECONDS);
-        } finally {
-            lock.unlock();
-        }
-        assertEquals(true, success);
-        assertEquals(10, topologyFromZK.getTopology().getNodes().size());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
index 54ce6a9..8107cde 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
@@ -1,43 +1,26 @@
 package org.apache.s4.comm.topology;
 
-import java.io.File;
+import java.io.IOException;
 
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
 import org.junit.Before;
 
 public class ZKBaseTest {
-    protected ZkServer zkServer = null;
-    protected ZkClient zkClient;
-    protected String zookeeperAddress;
 
-    @Before
-    public void setUp() {
-        String dataDir = System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "zookeeper"
-                + File.separator + "data";
-        String logDir = System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "zookeeper"
-                + File.separator + "logs";
-        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
-
-            @Override
-            public void createDefaultNameSpace(ZkClient zkClient) {
+    private Factory zkFactory;
 
-            }
-        };
-        int port = 3029;
-        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
-        zkServer.start();
-        zkClient = zkServer.getZkClient();
-        zookeeperAddress = "localhost:" + port;
+    @Before
+    public void setUp() throws IOException, InterruptedException, KeeperException {
+        CommTestUtils.cleanupTmpDirs();
+        zkFactory = CommTestUtils.startZookeeperServer();
 
     }
 
     @After
     public void tearDown() throws Exception {
-        if (zkServer != null) {
-            zkServer.shutdown();
-        }
+        CommTestUtils.stopZookeeperServer(zkFactory);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 808a357..e224b99 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -25,7 +25,7 @@ public class UDPCommTest extends ZkBasedTest {
 
     class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
         UDPCommTestModule() {
-            super(UDPEmitter.class, UDPListener.class);
+            super(UDPEmitter.class, null, UDPListener.class);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 7d7913d..f7cf147 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -13,6 +13,7 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
@@ -39,6 +40,7 @@ public class CommTestUtils {
     private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
 
     public static final int ZK_PORT = 2181;
+    public static final String ZK_STRING = "localhost:" + ZK_PORT;
     public static final int INITIAL_BOOKIE_PORT = 5000;
     public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
     public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
@@ -62,7 +64,7 @@ public class CommTestUtils {
             cmdList.add(arg);
         }
 
-        // System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
         ProcessBuilder pb = new ProcessBuilder(cmdList);
 
         pb.directory(new File(System.getProperty("user.dir")));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
deleted file mode 100644
index ebcf388..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
-
-    protected FileBasedClusterManagementTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            config.setProperty("cluster.lock_dir",
-                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(appClass);
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromFile.class);
-        bind(Topology.class).to(TopologyFromFile.class);
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index a368db8..a1e7089 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -4,24 +4,29 @@ import java.io.InputStream;
 
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.RemoteEmitterFactory;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.tcp.TCPRemoteEmitter;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromZK;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
 public class ZkBasedClusterManagementTestModule extends AbstractModule {
@@ -29,14 +34,16 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
     protected PropertiesConfiguration config = null;
 
     private Class<? extends Emitter> emitterClass = null;
+    private Class<? extends RemoteEmitter> remoteEmitterClass = null;
     private Class<? extends Listener> listenerClass = null;
 
     protected ZkBasedClusterManagementTestModule() {
     }
 
     protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
-            Class<? extends Listener> listenerClass) {
+            Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
         this.emitterClass = emitterClass;
+        this.remoteEmitterClass = remoteEmitterClass;
         this.listenerClass = listenerClass;
     }
 
@@ -46,11 +53,6 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
             InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
             config = new PropertiesConfiguration();
             config.load(is);
-            config.setProperty(
-                    "cluster.zk_address",
-                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+",
-                            "localhost:" + CommTestUtils.ZK_PORT));
-            System.out.println(ConfigurationUtils.toString(config));
             // TODO - validate properties.
 
             /* Make all properties injectable. Do we need this? */
@@ -66,11 +68,23 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
         if (config == null) {
             loadProperties(binder());
         }
-        bind(Cluster.class);
         bind(Hasher.class).to(DefaultHasher.class);
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
         bind(Assignment.class).to(AssignmentFromZK.class);
-        bind(Topology.class).to(TopologyFromZK.class);
+        bind(Clusters.class).to(ClustersFromZK.class);
+        bind(Cluster.class).to(ClusterFromZK.class);
+
+        // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+        if (this.remoteEmitterClass != null) {
+            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+                    RemoteEmitterFactory.class));
+        } else {
+            install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
+                    RemoteEmitterFactory.class));
+        }
+
+        bind(RemoteEmitters.class);
 
         if (this.emitterClass != null) {
             bind(Emitter.class).to(this.emitterClass);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index bda3bd8..2f336e0 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -1,11 +1,10 @@
 package org.apache.s4.fixtures;
 
-import java.io.File;
+import java.io.IOException;
 
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
 import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -13,46 +12,22 @@ import org.slf4j.LoggerFactory;
 
 public abstract class ZkBasedTest {
     private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
-    private ZkServer zkServer;
+    private Factory zkFactory;
 
     @Before
-    public void prepare() {
-        String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
-        String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+    public void prepare() throws IOException, InterruptedException, KeeperException {
         CommTestUtils.cleanupTmpDirs();
 
-        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+        zkFactory = CommTestUtils.startZookeeperServer();
 
-            @Override
-            public void createDefaultNameSpace(ZkClient zkClient) {
-
-            }
-        };
-
-        logger.info("Starting Zookeeper Server");
-        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
-        zkServer.start();
-
-        logger.info("Starting Zookeeper Client 1");
-        String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
-        @SuppressWarnings("unused")
-        ZkClient zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
-
-        ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
-        zkClient2.getCreationTime("/");
-
-        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
-        final String clusterName = "s4-test-cluster";
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 1, 1300);
+        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+        taskSetup.clean("s4");
+        taskSetup.setup("cluster1", 1, 1300);
     }
 
     @After
-    public void cleanupZkBasedTest() {
-        if (zkServer != null) {
-            zkServer.shutdown();
-            zkServer = null;
-        }
+    public void cleanupZkBasedTest() throws IOException, InterruptedException {
+        CommTestUtils.stopZookeeperServer(zkFactory);
         CommTestUtils.cleanupTmpDirs();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/default.s4.properties b/subprojects/s4-comm/src/test/resources/default.s4.properties
index 0e31dfa..6624ff4 100644
--- a/subprojects/s4-comm/src/test/resources/default.s4.properties
+++ b/subprojects/s4-comm/src/test/resources/default.s4.properties
@@ -1,9 +1,6 @@
 comm.queue_emmiter_size = 8000
 comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = {user.dir}/tmp
-cluster.name = s4-test-cluster
+cluster.name = cluster1
 cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/resources/s4-comm-test.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/s4-comm-test.properties b/subprojects/s4-comm/src/test/resources/s4-comm-test.properties
deleted file mode 100644
index 340ee5b..0000000
--- a/subprojects/s4-comm/src/test/resources/s4-comm-test.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = /tmp
-cluster.isCluster = true
-emitter.send.interval = 100
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index afa85c9..46d580d 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -15,15 +15,19 @@
  */
 
 description = 'The S4 core platform.'
- 
+
 dependencies {
     compile project(":s4-base")
     compile project(":s4-comm")
     compile project(path: ':s4-comm', configuration: 'tests')
     compile libraries.jcommander
-    testCompile project(path: ':s4-comm', configuration: 'tests')
+    testCompile project(path: ':s4-comm', configuration: 'tests')
+    testCompile libraries.gradle_base_services
+    testCompile libraries.gradle_core
+    testCompile libraries.gradle_tooling_api
+    testCompile libraries.gradle_wrapper
 }
 
 test {
-    forkEvery=1
+    forkEvery=1;
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index d33e718..c6bffd3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -21,9 +21,11 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.RemoteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +33,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.name.Named;
 
 /*
  * Container base class to hold all processing elements. We will implement administrative methods here.
@@ -50,11 +53,25 @@ public abstract class App {
 
     private ClockType clockType = ClockType.WALL_CLOCK;
     private int id = -1;
+
     @Inject
     private Sender sender;
     @Inject
     private Receiver receiver;
 
+    @Inject
+    RemoteSenders remoteSenders;
+
+    @Inject
+    Hasher hasher;
+
+    @Inject
+    RemoteStreams remoteStreams;
+
+    @Inject
+    @Named("cluster.name")
+    String clusterName;
+
     // serialization uses the application class loader
     private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
 
@@ -254,9 +271,9 @@ public abstract class App {
      *            - receives events from the communication layer.
      */
     public void setCommLayer(Sender sender, Receiver receiver) {
-        this.sender = sender;
-        this.receiver = receiver;
-        sender.setPartition(receiver.getPartition());
+        // this.sender = sender;
+        // this.receiver = receiver;
+        // sender.setPartition(receiver.getPartition());
     }
 
     /**
@@ -298,6 +315,27 @@ public abstract class App {
         return new Stream<T>(this, name, processingElements);
     }
 
+    protected <T extends Event> RemoteStream createOutputStream(String name) {
+        return createOutputStream(name, null);
+    }
+
+    protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
+        return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
+    }
+
+    protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
+            ProcessingElement... processingElements) {
+        remoteStreams.addInputStream(getId(), clusterName, streamName);
+        return createStream(streamName, finder, processingElements);
+
+    }
+
+    protected <T extends Event> Stream<T> createInputStream(String streamName, ProcessingElement... processingElements) {
+        remoteStreams.addInputStream(getId(), clusterName, streamName);
+        return createStream(streamName, processingElements);
+
+    }
+
     /**
      * Creates a {@link ProcessingElement} prototype.
      * 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
deleted file mode 100644
index 17529d3..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.s4.core;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
- * 
- */
-public class CustomModule extends AbstractModule {
-
-    InputStream configFileInputStream;
-    private PropertiesConfiguration config;
-
-    public CustomModule(InputStream configFileInputStream) {
-        this.configFileInputStream = configFileInputStream;
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (configFileInputStream != null) {
-            try {
-                configFileInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        int numHosts = config.getList("cluster.hosts").size();
-        boolean isCluster = numHosts > 1 ? true : false;
-        bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
-        bind(Cluster.class);
-
-        bind(Assignment.class).to(AssignmentFromZK.class);
-
-        bind(Topology.class).to(TopologyFromZK.class);
-
-        bind(Emitter.class).to(TCPEmitter.class);
-        bind(Listener.class).to(TCPListener.class);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
-    }
-
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(configFileInputStream);
-
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
new file mode 100644
index 0000000..9003f26
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
@@ -0,0 +1,99 @@
+package org.apache.s4.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.tcp.TCPRemoteEmitter;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ * 
+ */
+public class DefaultModule extends AbstractModule {
+
+    InputStream configFileInputStream;
+    private PropertiesConfiguration config;
+
+    public DefaultModule(InputStream configFileInputStream) {
+        this.configFileInputStream = configFileInputStream;
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        if (configFileInputStream != null) {
+            try {
+                configFileInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
+
+        bind(Emitter.class).to(TCPEmitter.class);
+
+        bind(Listener.class).to(TCPListener.class);
+
+        bind(Assignment.class).to(AssignmentFromZK.class);
+
+        bind(Clusters.class).to(ClustersFromZK.class);
+        bind(Cluster.class).to(ClusterFromZK.class);
+
+        // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+        install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
+                RemoteEmitterFactory.class));
+        bind(RemoteEmitters.class);
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+    }
+
+    private void loadProperties(Binder binder) {
+        try {
+            config = new PropertiesConfiguration();
+            config.load(configFileInputStream);
+
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index c18603c..09a3778 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -48,7 +48,8 @@ public class Main {
     private static void startCustomS4Node(String s4PropertiesFilePath) throws FileNotFoundException {
         // TODO that's quite inconvenient anyway: we still need to specify the comm module in the config
         // file passed as a parameter...
-        Injector injector = Guice.createInjector(new CustomModule(new FileInputStream(new File(s4PropertiesFilePath))));
+        Injector injector = Guice
+                .createInjector(new DefaultModule(new FileInputStream(new File(s4PropertiesFilePath))));
         startServer(logger, injector);
     }
 
@@ -104,7 +105,7 @@ public class Main {
             } else {
                 URL defaultS4Config = null;
                 try {
-                    defaultS4Config = Resources.getResource("/default.s4.properties");
+                    defaultS4Config = Resources.getResource("default.s4.properties");
                 } catch (IllegalArgumentException e) {
                     logger.error(
                             "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 34309d6..2571e20 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -20,10 +20,12 @@ import com.google.inject.Singleton;
  * <p>
  * A Listener implementation receives data from the network and passes an event as a byte array to the {@link Receiver}.
  * The byte array is de-serialized and converted into an {@link Event}. Finally the event is passed to the matching
- * streams. There is a single {@link Receiver} instance per node.
+ * streams.
+ * </p>
+ * There is a single {@link Receiver} instance per node.
  * 
  * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer.
+ * from the application developer. </p>
  */
 @Singleton
 public class Receiver implements Runnable {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 48b980b..cdb0b01 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -1,18 +1,28 @@
 package org.apache.s4.core;
 
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+public class RemoteSender {
 
-@Singleton
-public class RemoteSender extends Sender {
+    final private Emitter emitter;
+    final private Hasher hasher;
 
-    @Inject
-    public RemoteSender(RemoteEmitter emitter, SerializerDeserializer serDeser, Hasher hasher) {
-        super(emitter, serDeser, hasher);
+    public RemoteSender(Emitter emitter, Hasher hasher) {
+        super();
+        this.emitter = emitter;
+        this.hasher = hasher;
     }
 
+    public void send(String hashKey, EventMessage eventMessage) {
+        if (hashKey == null) {
+            for (int i = 0; i < emitter.getPartitionCount(); i++) {
+                emitter.send(i, eventMessage);
+            }
+        } else {
+            int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+            emitter.send(partition, eventMessage);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
new file mode 100644
index 0000000..d44d270
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -0,0 +1,61 @@
+package org.apache.s4.core;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.Clusters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+public class RemoteSenders {
+
+    Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
+
+    @Inject
+    RemoteEmitters emitters;
+
+    @Inject
+    RemoteStreams streams;
+
+    @Inject
+    Clusters topologies;
+
+    @Inject
+    SerializerDeserializer serDeser;
+
+    @Inject
+    Hasher hasher;
+
+    Map<String, RemoteSender> sendersByTopology = new HashMap<String, RemoteSender>();
+
+    public void send(String hashKey, Event event) {
+
+        Set<StreamConsumer> consumers = streams.getConsumers(event.getStreamName());
+        for (StreamConsumer consumer : consumers) {
+            RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
+            if (sender == null) {
+                sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())),
+                        hasher);
+                // TODO cleanup when remote topologies die
+                sendersByTopology.put(consumer.getClusterName(), sender);
+            }
+            // we must set the app id of the consumer app for correct dispatch within the consumer node
+            // NOTE: this implies multiple serializations, there might be an optimization
+            event.setAppId(consumer.getAppId());
+            EventMessage eventMessage = new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(),
+                    serDeser.serialize(event));
+            sender.send(hashKey, eventMessage);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
new file mode 100644
index 0000000..0833a07
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
@@ -0,0 +1,77 @@
+package org.apache.s4.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream that dispatches events to a remote cluster
+ * 
+ */
+public class RemoteStream implements Streamable<Event> {
+
+    final private String name;
+    final protected Key<Event> key;
+    final static private String DEFAULT_SEPARATOR = "^";
+    // final private int id;
+
+    RemoteSenders remoteSenders;
+
+    Hasher hasher;
+
+    int id;
+    private App app;
+    private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
+
+    private static AtomicInteger remoteStreamCounter = new AtomicInteger();
+
+    public RemoteStream(App app, String name, KeyFinder<Event> finder, RemoteSenders remoteSenders, Hasher hasher,
+            RemoteStreams remoteStreams, String clusterName) {
+        this.app = app;
+        this.name = name;
+        this.remoteSenders = remoteSenders;
+        this.hasher = hasher;
+        if (finder == null) {
+            this.key = null;
+        } else {
+            this.key = new Key<Event>(finder, DEFAULT_SEPARATOR);
+        }
+        remoteStreams.addOutputStream(String.valueOf(app.getId()), clusterName, name);
+
+    }
+
+    @Override
+    public void put(Event event) {
+        event.setStreamId(getName());
+        event.setAppId(app.getId());
+
+        if (key != null) {
+            remoteSenders.send(key.get(event), event);
+        } else {
+            remoteSenders.send(null, event);
+        }
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void start() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 39e5a98..1104716 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -7,7 +7,6 @@ import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
 /**
  * The {@link Sender} and its counterpart {@link Receiver} are the top level classes of the communication layer.
@@ -17,7 +16,6 @@ import com.google.inject.Singleton;
  * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
  * from the application developer.
  */
-@Singleton
 public class Sender {
     final private Emitter emitter;
     final private SerializerDeserializer serDeser;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index d148868..fdbd800 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -2,14 +2,12 @@ package org.apache.s4.core;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.jar.Attributes.Name;
 import java.util.jar.JarFile;
 
 import org.I0Itec.zkclient.ZkClient;
-import org.apache.s4.base.Event;
 import org.apache.s4.base.util.S4RLoader;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.deploy.DeploymentManager;
@@ -19,7 +17,6 @@ import org.slf4j.LoggerFactory;
 import ch.qos.logback.classic.Level;
 
 import com.google.common.collect.Maps;
-import com.google.common.io.PatternFilenameFilter;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -56,11 +53,11 @@ public class Server {
      *
      */
     @Inject
-    public Server(@Named("comm.module") String commModuleName, @Named("s4.logger_level") String logLevel,
-            @Named("appsDir") String appsDir, @Named("cluster.name") String clusterName,
-            @Named("cluster.zk_address") String zookeeperAddress,
+    public Server(String commModuleName, @Named("s4.logger_level") String logLevel, @Named("appsDir") String appsDir,
+            @Named("cluster.name") String clusterName, @Named("cluster.zk_address") String zookeeperAddress,
             @Named("cluster.zk_session_timeout") int sessionTimeout,
             @Named("cluster.zk_connection_timeout") int connectionTimeout) {
+        // TODO do we need to separate the comm module?
         this.commModuleName = commModuleName;
         this.logLevel = logLevel;
         this.appsDir = appsDir;
@@ -96,10 +93,13 @@ public class Server {
                 logger.error("Cannot create apps directory [{}]", appsDir);
             }
         }
-        File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
-        for (File s4rFile : s4rFiles) {
-            loadApp(s4rFile);
-        }
+
+        // disabled app loading from local files
+
+        // File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
+        // for (File s4rFile : s4rFiles) {
+        // loadApp(s4rFile);
+        // }
 
         /* Now init + start apps. TODO: implement dynamic loading/unloading using ZK. */
         for (Map.Entry<String, App> appEntry : apps.entrySet()) {
@@ -108,11 +108,6 @@ public class Server {
         }
 
         for (Map.Entry<String, App> appEntry : apps.entrySet()) {
-            logger.info("Initializing app streams " + appEntry.getValue().getClass().getName());
-            updateStreams(appEntry.getValue(), appEntry.getKey());
-        }
-
-        for (Map.Entry<String, App> appEntry : apps.entrySet()) {
             logger.info("Starting app " + appEntry.getKey() + "/" + appEntry.getValue().getClass().getName());
             appEntry.getValue().start();
         }
@@ -138,46 +133,6 @@ public class Server {
         return loadApp(s4r, s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
     }
 
-    public void updateStreams(App app, String appName) {
-        // register streams
-        List<Streamable<Event>> appStreams = app.getStreams();
-        for (Streamable<Event> streamable : appStreams) {
-            if (streams.containsKey(streamable.getName())) {
-                logger.error("Application {} defines the stream {} but there is already a stream with that name",
-                        new String[] { appName, streamable.getName() });
-            } else {
-                // zkClient.createEphemeral("/" + clusterName + "/streams/producers/" + appName);
-                logger.debug("Adding stream {} for app {}");
-                streams.put(streamable.getName(), streamable);
-                if (eventSources.containsKey(streamable.getName())) {
-                    logger.debug("Connecting matching event source for stream {} for app {}", streamable.getName(),
-                            appName);
-                    eventSources.get(streamable.getName()).subscribeStream(streams.get(streamable.getName()));
-                }
-            }
-        }
-
-        List<EventSource> appEventSources = app.getEventSources();
-        for (EventSource eventSource : appEventSources) {
-            if (eventSources.containsKey(eventSource.getName())) {
-                logger.error(
-                        "Application {} defines the event source {} but there is already an event source with that name, from app {}",
-                        new String[] { appName, eventSource.getName(),
-                                String.valueOf(streams.get(eventSource.getName())) });
-            } else {
-                // zkClient.createEphemeral("/" + clusterName + "/streams/consumers/" + appName);
-                logger.debug("adding event source {} from app {}", eventSource.getName(), appName);
-                eventSources.put(eventSource.getName(), eventSource);
-            }
-            if (streams.containsKey(eventSource.getName())) {
-                logger.debug("Connecting matching stream from app {} to event source {}", appName,
-                        eventSource.getName());
-                eventSource.subscribeStream(streams.get(eventSource.getName()));
-            }
-        }
-
-    }
-
     public App loadApp(File s4r, String appName) {
 
         // TODO handle application upgrade
@@ -222,8 +177,6 @@ public class Server {
 
         app.init();
 
-        updateStreams(app, appName);
-
         app.start();
     }
 }