You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/07 21:26:08 UTC

git commit: Fixed some issues introduced when moving cluster knowledge away from emitters. - also fixed tests so they compile correctly - key issues are: - housekeeping for connections in emitters. In particular, how to detect cluster changes and knowin

Updated Branches:
  refs/heads/S4-110-new a65684e9b -> 423b7e873


Fixed some issues introduced when moving cluster knowledge away from emitters.
- also fixed tests so they compile correctly
- key issues are:
	- housekeeping for connections in emitters.
In particular, how to detect cluster changes and knowing that a given destination may
have to be reconnected to (if it failed, restarted, and reused same connection info)
	- locality resolution in sender
	- inter app communication


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/423b7e87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/423b7e87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/423b7e87

Branch: refs/heads/S4-110-new
Commit: 423b7e873ce8bf436af46cfd2ee45c70ce14e02a
Parents: a65684e
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Feb 7 21:07:44 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Feb 7 21:07:44 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DefaultCommModule.java |    3 +-
 .../org/apache/s4/comm/tcp/TCPDestination.java     |    9 +-
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |   74 +++++++--------
 .../org/apache/s4/comm/topology/ClusterFromZK.java |   19 ++++-
 .../org/apache/s4/comm/topology/ClusterNode.java   |   49 +++++++++-
 .../apache/s4/comm/topology/PhysicalCluster.java   |   18 ++--
 .../org/apache/s4/comm/udp/UDPDestination.java     |    9 +-
 .../java/org/apache/s4/comm/tcp/TCPBasicTest.java  |    4 +-
 .../java/org/apache/s4/comm/udp/UDPBasicTest.java  |    4 +-
 .../org/apache/s4/fixtures/TestCommModule.java     |    6 +
 .../main/java/org/apache/s4/core/AppModule.java    |    7 +-
 .../main/java/org/apache/s4/core/BaseModule.java   |   28 +++---
 .../main/java/org/apache/s4/core/SenderImpl.java   |    6 +-
 .../org/apache/s4/core/ft/FTWordCountTest.java     |    3 +-
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |   13 ++-
 .../core/moduleloader/ModuleLoaderTestUtils.java   |    9 +-
 .../apache/s4/deploy/TestAutomaticDeployment.java  |    3 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   10 +-
 .../src/main/java/s4app/ProducerApp.java           |    1 -
 19 files changed, 174 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index a1d6319..8936f96 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
+ok * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -104,7 +104,6 @@ public class DefaultCommModule extends AbstractModule {
                     .getString("s4.comm.emitter.remote.class"));
             install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
                     RemoteEmitterFactory.class));
-            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
         } catch (ClassNotFoundException e) {
             logger.error("Cannot find class implementation ", e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
index 0ddffb8..7523422 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPDestination.java
@@ -3,10 +3,13 @@ package org.apache.s4.comm.tcp;
 import org.apache.s4.base.Destination;
 import org.apache.s4.comm.topology.ClusterNode;
 
-public class TCPDestination extends ClusterNode  implements Destination {
+public class TCPDestination extends ClusterNode implements Destination {
 
-    public TCPDestination(int partition, int port, String machineName,
-            String taskId) {
+    public TCPDestination(ClusterNode clusterNode) {
+        this(clusterNode.getPartition(), clusterNode.getPort(), clusterNode.getMachineName(), clusterNode.getTaskId());
+    }
+
+    public TCPDestination(int partition, int port, String machineName, String taskId) {
         super(partition, port, machineName, taskId);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 266d15f..8133316 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -34,7 +34,6 @@ import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterChangeListener;
-import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.util.EmitterMetrics;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -87,7 +86,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     /*
      * Channel used to send messages to each destination
      */
-    private final BiMap<Destination, Channel> partitionChannelMap;
+    private final BiMap<Destination, Channel> destinationChannelMap;
 
     // lock for synchronizing between cluster updates callbacks and other code
     private final Lock lock;
@@ -95,7 +94,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     @Inject
     SerializerDeserializerFactory serDeserFactory;
     SerializerDeserializer serDeser;
-    Map<Integer, Semaphore> writePermits = Maps.newHashMap();
+    Map<Destination, Semaphore> writePermits = Maps.newHashMap();
 
     EmitterMetrics metrics;
 
@@ -122,7 +121,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
         // Initialize data structures
         int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
-        partitionChannelMap = HashBiMap.create(clusterSize);
+        destinationChannelMap = HashBiMap.create(clusterSize);
 
         // Initialize netty related structures
         ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
@@ -147,7 +146,6 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     @Inject
     private void init() {
-        refreshCluster();
         this.topology.addListener(this);
         serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
         metrics = new EmitterMetrics(topology);
@@ -166,7 +164,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             connectFuture.await();
             if (connectFuture.isSuccess()) {
                 channels.add(connectFuture.getChannel());
-                partitionChannelMap.forcePut(destination, connectFuture.getChannel());
+                destinationChannelMap.forcePut(destination, connectFuture.getChannel());
+                writePermits.put(destination, new Semaphore(maxPendingWrites));
                 return true;
             }
         } catch (InterruptedException ie) {
@@ -176,12 +175,12 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         }
         return false;
     }
-    
+
     @Override
     public boolean send(Destination destination, ByteBuffer message) throws InterruptedException {
         ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(message);
 
-        if (!partitionChannelMap.containsKey(destination)) {
+        if (!destinationChannelMap.containsKey(destination)) {
             if (!connectTo((TCPDestination) destination)) {
                 logger.warn("Could not connect to partition {}, discarding message", destination);
                 // Couldn't connect, discard message
@@ -191,7 +190,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
         writePermits.get(destination).acquire();
 
-        Channel c = partitionChannelMap.get(destination);
+        Channel c = destinationChannelMap.get(destination);
         if (c == null) {
             logger.warn("Could not find channel for destination {}", destination);
             return false;
@@ -201,10 +200,8 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         return true;
     }
 
-    
-
-    protected void removeChannel(int partition) {
-        Channel c = partitionChannelMap.remove(partition);
+    protected void removeChannel(Destination destination) {
+        Channel c = destinationChannelMap.remove(destination);
         if (c == null) {
             return;
         }
@@ -239,34 +236,35 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
     private void refreshCluster() {
         lock.lock();
         try {
-            /*
-            for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
-                Integer partition = clusterNode.getPartition();
-                if (partition == null) {
-                    logger.error("Illegal partition for clusterNode - " + clusterNode);
-                    return;
-                }
-
-                ClusterNode oldNode = partitionNodeMap.remove(partition);
-                if (oldNode != null && !oldNode.equals(clusterNode)) {
-                    removeChannel(partition);
-                }
-                partitionNodeMap.forcePut(partition, clusterNode);
-                if (!writePermits.containsKey(partition)) {
-                    writePermits.put(partition, new Semaphore(maxPendingWrites));
-                }
-                
-            }*/
+            // // dropped destinations are those in local map but not in updated cluster config
+            // SetView<Destination> droppedDestinations = Sets.difference(destinationChannelMap.keySet(), Sets
+            // .newHashSet(Collections2.transform(topology.getPhysicalCluster().getNodes(),
+            // new Function<ClusterNode, Destination>() {
+            //
+            // @Override
+            // public Destination apply(ClusterNode clusterNode) {
+            // return new TCPDestination(clusterNode);
+            // }
+            // })));
+            // for (Destination dropped : droppedDestinations) {
+            // destinationChannelMap.remove(dropped);
+            // writePermits.remove(dropped);
+            // removeChannel(dropped);
+            // }
+            //
+            // for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
+            // Destination destination = new TCPDestination(clusterNode);
+            // if (!destinationChannelMap.containsKey(destination)) {
+            // destinationChannelMap.put(new TCPDestination(clusterNode), null);
+            // writePermits.put(destination, new Semaphore(maxPendingWrites));
+            // }
+            //
+            // }
         } finally {
             lock.unlock();
         }
     }
 
-//    @Override
-//    public int getPartitionCount() {
-//        return topology.getPhysicalCluster().getPartitionCount();
-//    }
-    
     @Override
     public int getPartitionCount(String streamName) {
         return topology.getPhysicalCluster().getPartitionCount(streamName);
@@ -277,10 +275,10 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
             Throwable t = e.getCause();
             if (t instanceof ClosedChannelException) {
-                partitionChannelMap.inverse().remove(e.getChannel());
+                destinationChannelMap.inverse().remove(e.getChannel());
                 return;
             } else if (t instanceof ConnectException) {
-                partitionChannelMap.inverse().remove(e.getChannel());
+                destinationChannelMap.inverse().remove(e.getChannel());
                 return;
             } else {
                 logger.error("Unexpected exception", t);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 27bd354..5144831 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,8 +30,9 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.s4.base.Destination;
+import org.apache.s4.comm.tcp.TCPDestination;
+import org.apache.s4.comm.udp.UDPDestination;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -210,14 +211,28 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
         doProcess();
 
     }
+
     @Override
     public Destination getDestination(String streamName, int partitionId, String type) {
+        List<ClusterNode> nodes = clusterRef.get().getNodes();
+        for (ClusterNode node : nodes) {
+            if (node.getPartition() == partitionId) {
+                if ("tcp".equalsIgnoreCase(type)) {
+                    return new TCPDestination(node);
+                } else if ("udp".equalsIgnoreCase(type)) {
+                    return new UDPDestination(node);
+                } else {
+                    logger.error("Unsupported destination type {}", type);
+                    break;
+                }
+            }
+        }
         return null;
     }
 
     @Override
     public Integer getPartitionCount(String streamName) {
-        return null;
+        return clusterRef.get().getPartitionCount(streamName);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
index e622107..fd5af72 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
@@ -20,13 +20,13 @@ package org.apache.s4.comm.topology;
 
 /**
  * Represents an node.
- *
+ * 
  */
 public class ClusterNode {
-    private int partition;
-    private int port;
-    private String machineName;
-    private String taskId;
+    private final int partition;
+    private final int port;
+    private final String machineName;
+    private final String taskId;
 
     public ClusterNode(int partition, int port, String machineName, String taskId) {
         this.partition = partition;
@@ -51,10 +51,49 @@ public class ClusterNode {
         return taskId;
     }
 
+    @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append("{").append("partition=").append(partition).append(",port=").append(port).append(",machineName=")
                 .append(machineName).append(",taskId=").append(taskId).append("}");
         return sb.toString();
     }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
+        result = prime * result + partition;
+        result = prime * result + port;
+        result = prime * result + ((taskId == null) ? 0 : taskId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClusterNode other = (ClusterNode) obj;
+        if (machineName == null) {
+            if (other.machineName != null)
+                return false;
+        } else if (!machineName.equals(other.machineName))
+            return false;
+        if (partition != other.partition)
+            return false;
+        if (port != other.port)
+            return false;
+        if (taskId == null) {
+            if (other.taskId != null)
+                return false;
+        } else if (!taskId.equals(other.taskId))
+            return false;
+        return true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index 1d13cf4..c391e28 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -90,19 +90,20 @@ public class PhysicalCluster {
         }
     }
 
-//    /**
-//     * 
-//     * @return Number of partitions in the cluster.
-//     */
-//    public int getPartitionCount() {
-//        return numPartitions;
-//    }
-    
+    // /**
+    // *
+    // * @return Number of partitions in the cluster.
+    // */
+    // public int getPartitionCount() {
+    // return numPartitions;
+    // }
+
     /**
      * 
      * @return Number of partitions in the cluster per stream
      */
     public int getPartitionCount(String streamName) {
+        // TODO regardless of stream name ?
         return numPartitions;
     }
 
@@ -140,6 +141,7 @@ public class PhysicalCluster {
         this.name = name;
     }
 
+    @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append("{ nbNodes=").append(nodes.size()).append(",name=").append(name).append(",mode=").append(mode)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
index b424f0c..c6fe097 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPDestination.java
@@ -3,10 +3,13 @@ package org.apache.s4.comm.udp;
 import org.apache.s4.base.Destination;
 import org.apache.s4.comm.topology.ClusterNode;
 
-public class UDPDestination extends ClusterNode implements Destination{
+public class UDPDestination extends ClusterNode implements Destination {
 
-    public UDPDestination(int partition, int port, String machineName,
-            String taskId) {
+    public UDPDestination(ClusterNode clusterNode) {
+        this(clusterNode.getPartition(), clusterNode.getPort(), clusterNode.getMachineName(), clusterNode.getTaskId());
+    }
+
+    public UDPDestination(int partition, int port, String machineName, String taskId) {
         super(partition, port, machineName, taskId);
         // TODO Auto-generated constructor stub
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
index e400813..600ff79 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@ -7,6 +7,7 @@ import junit.framework.Assert;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.MockReceiverModule;
 import org.apache.s4.fixtures.NoOpReceiverModule;
@@ -45,7 +46,8 @@ public class TCPBasicTest extends ZkBasedTest {
         injector2.getInstance(Listener.class);
 
         // send to the other node
-        emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+        emitter.send(new TCPDestination(injector2.getInstance(Assignment.class).assignClusterNode()), injector1
+                .getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
 
         // check receiver got the message
         Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
index b06734b..6136a04 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@ -7,6 +7,7 @@ import junit.framework.Assert;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.MockReceiverModule;
 import org.apache.s4.fixtures.NoOpReceiverModule;
@@ -44,7 +45,8 @@ public class UDPBasicTest extends ZkBasedTest {
         injector2.getInstance(Listener.class);
 
         // send to the other partition (1)
-        emitter.send(1, injector1.getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
+        emitter.send(new UDPDestination(injector2.getInstance(Assignment.class).assignClusterNode()), injector1
+                .getInstance(SerializerDeserializer.class).serialize(CommTestUtils.MESSAGE));
 
         Assert.assertTrue(CommTestUtils.SIGNAL_MESSAGE_RECEIVED.await(5, TimeUnit.SECONDS));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index 827ec42..c4e6bb6 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -5,11 +5,14 @@ import java.io.InputStream;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromZK;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
 
+import com.google.inject.Scopes;
 import com.google.inject.name.Names;
 
 /**
@@ -39,5 +42,8 @@ public class TestCommModule extends DefaultCommModule {
         ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
         zkClient.setZkSerializer(new ZNRecordSerializer());
         bind(ZkClient.class).toInstance(zkClient);
+
+        bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
index 20556f8..28da720 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -9,6 +9,7 @@ import org.apache.s4.comm.tcp.TCPListener;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.Scopes;
 
 public class AppModule extends AbstractModule {
 
@@ -27,11 +28,11 @@ public class AppModule extends AbstractModule {
     protected void configure() {
         // bind(S4Metrics.class);
 
-        bind(Receiver.class).to(ReceiverImpl.class);
-        bind(Sender.class).to(SenderImpl.class);
+        bind(Receiver.class).to(ReceiverImpl.class).in(Scopes.SINGLETON);
+        bind(Sender.class).to(SenderImpl.class).in(Scopes.SINGLETON);
 
         // TODO allow pluggable transport implementation
-        bind(Listener.class).to(TCPListener.class);
+        bind(Listener.class).to(TCPListener.class).in(Scopes.SINGLETON);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 171c564..ccb478f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -9,8 +9,10 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.helix.TaskStateModelFactory;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromHelix;
+import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterFromHelix;
+import org.apache.s4.comm.topology.ClusterFromZK;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
@@ -45,32 +47,30 @@ public class BaseModule extends AbstractModule {
         if (config == null) {
             loadProperties(binder());
         }
-        if (useHelix) {
-            bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+        bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+        if (config.containsKey("s4.helix") && config.getBoolean("s4.helix")) {
             bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
             bind(Cluster.class).to(ClusterFromHelix.class);
             bind(TaskStateModelFactory.class);
             bind(AppStateModelFactory.class).in(Scopes.SINGLETON);
             // bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
-            bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
             bind(Bootstrap.class).to(S4HelixBootstrap.class);
 
             // share the Zookeeper connection
             return;
-        }
-        // a node holds a single partition assignment
-        // ==> Assignment is a singleton so it shared between base, comm and app
-        // layers.
-        // it is eager so that the node is able to join a cluster immediately
-        // bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
-        // bind(Cluster.class).to(ClusterFromZK.class);
+        } else {
+            // a node holds a single partition assignment
+            // ==> Assignment is a singleton so it shared between base, comm and app
+            // layers.
+            // it is eager so that the node is able to join a cluster immediately
+            bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+            bind(Cluster.class).to(ClusterFromZK.class);
 
-        bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
-        bind(Bootstrap.class).to(S4Bootstrap.class);
+            bind(Bootstrap.class).to(S4Bootstrap.class);
 
-        // share the Zookeeper connection
-        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
index 5ade15d..5e4da13 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -61,7 +61,7 @@ public class SenderImpl implements Sender {
     @Inject
     S4Metrics metrics;
 
-    private Cluster cluster;
+    private final Cluster cluster;
 
     /**
      * 
@@ -137,7 +137,7 @@ public class SenderImpl implements Sender {
         public void run() {
             ByteBuffer serializedEvent = serDeser.serialize(event);
             try {
-                //TODO: where can we get the type ?
+                // TODO: where can we get the type ?
                 Destination destination = cluster.getDestination(event.getStreamName(), remotePartitionId, "tcp");
                 emitter.send(destination, serializedEvent);
             } catch (InterruptedException e) {
@@ -167,7 +167,7 @@ public class SenderImpl implements Sender {
                 /* Don't use the comm layer when we send to the same partition. */
                 if (localPartitionId != i) {
                     try {
-                        //TODO: where to get the mode from
+                        // TODO: where to get the mode from
                         Destination destination = cluster.getDestination(event.getStreamName(), i, "tcp");
                         emitter.send(destination, serializedEvent);
                         metrics.sentEvent(i);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 50846b4..a1ec575 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -26,6 +26,7 @@ import junit.framework.Assert;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.core.util.AppConfig;
 import org.apache.s4.deploy.DeploymentUtils;
@@ -125,7 +126,7 @@ public class FTWordCountTest extends ZkBasedTest {
         event.setStreamId("inputStream");
         event.put("sentence", String.class, sentence);
         emitter.send(
-                0,
+                new TCPDestination(0, 1300, "localhost", "Task-0"),
                 injector.getInstance(SerializerDeserializerFactory.class)
                         .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 4dc9484..1236e14 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -28,6 +28,7 @@ import junit.framework.Assert;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.util.AppConfig;
@@ -90,9 +91,9 @@ public class RecoveryTest extends ZkBasedTest {
         event.setStreamId("inputStream");
         event.put("command", String.class, "checkpoint");
         emitter.send(
-                0,
-                injector.getInstance(SerializerDeserializerFactory.class)
-                        .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
+
+        new TCPDestination(0, 1300, "localhost", "Task-0"), injector.getInstance(SerializerDeserializerFactory.class)
+                .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
     }
 
     private void testCheckpointingConfiguration(Class<?> appClass, Class<?> backendModuleClass,
@@ -136,7 +137,7 @@ public class RecoveryTest extends ZkBasedTest {
         event.put("value", String.class, "message1");
         event.setStreamId("inputStream");
         emitter.send(
-                0,
+                new TCPDestination(0, 1300, "localhost", "Task-0"),
                 injector.getInstance(SerializerDeserializerFactory.class)
                         .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
 
@@ -172,11 +173,11 @@ public class RecoveryTest extends ZkBasedTest {
         event.put("command", String.class, "setValue2");
         event.put("value", String.class, "message2");
         emitter.send(
-                0,
+                new TCPDestination(new TCPDestination(0, 1300, "localhost", "Task-0")),
                 injector.getInstance(SerializerDeserializerFactory.class)
                         .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
 
-        Assert.assertTrue(signalValue2Set.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalValue2Set.await(20, TimeUnit.SECONDS));
 
         Assert.assertEquals(expectedFinalResult, new String(zk.getData("/data", false, null)));
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
index 1ca862c..aedac39 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -12,6 +12,7 @@ import org.apache.s4.base.Event;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.core.BaseModule;
@@ -68,8 +69,8 @@ public class ModuleLoaderTestUtils {
         }
 
         Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
-                .openStream(), "cluster1",null), new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
-                .openStream()));
+                .openStream(), "cluster1", null),
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
 
         Emitter emitter = injector.getInstance(TCPEmitter.class);
         List<Long> messages = Lists.newArrayList();
@@ -93,11 +94,11 @@ public class ModuleLoaderTestUtils {
             Event event = new Event();
             event.put("message", long.class, message);
             event.setStreamId("inputStream");
-            emitter.send(0, serDeser.serialize(event));
+            emitter.send(new TCPDestination(0, 1300, "localhost", "Task-0"), serDeser.serialize(event));
         }
 
         // check sequential nodes in zk with correct data
-        Assert.assertTrue(signalMessagesReceived.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalMessagesReceived.await(20, TimeUnit.SECONDS));
         List<String> children = zkClient.getChildren("/test");
         for (String child : children) {
             Long data = zkClient.readData("/test/" + child);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 8b5329b..f8e0096 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.core.util.AppConfig;
@@ -127,7 +128,7 @@ public class TestAutomaticDeployment extends ZkBasedTest {
         event.setStreamId("inputStream");
         event.put("line", String.class, time1);
         emitter.send(
-                0,
+                new TCPDestination(0, 1300, "localhost", "Task-0"),
                 injector.getInstance(SerializerDeserializerFactory.class)
                         .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index c25d756..8d2b58c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -27,6 +27,7 @@ import junit.framework.Assert;
 import org.apache.s4.base.Event;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPDestination;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.core.BaseModule;
 import org.apache.s4.core.DefaultCoreModule;
@@ -59,9 +60,9 @@ public class WordCountTest extends ZkBasedTest {
 
     public void createEmitter() throws IOException {
         injector = Guice.createInjector(new BaseModule(
-                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null), new DefaultCommModule(
-                Resources.getResource("default.s4.comm.properties").openStream()), new DefaultCoreModule(Resources
-                .getResource("default.s4.core.properties").openStream()));
+                Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null),
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
+                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
 
         emitter = injector.getInstance(TCPEmitter.class);
 
@@ -113,9 +114,8 @@ public class WordCountTest extends ZkBasedTest {
 
         // NOTE: we send to partition 0 since partition 1 hosts the emitter
         emitter.send(
-                0,
+                new TCPDestination(0, 1300, "localhost", "Task-0"),
                 injector.getInstance(SerializerDeserializerFactory.class)
                         .createSerializerDeserializer(Thread.currentThread().getContextClassLoader()).serialize(event));
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/423b7e87/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
----------------------------------------------------------------------
diff --git a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
index ac7f251..b276342 100644
--- a/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
+++ b/test-apps/producer-app/src/main/java/s4app/ProducerApp.java
@@ -34,7 +34,6 @@ public class ProducerApp extends App {
     @Override
     protected void onInit() {
         System.out.println("Initing CounterApp...");
-
         producerPE = createPE(ProducerPE.class, "producer");
         producerPE.setStreams(createOutputStream("tickStream"));