You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/01/24 17:53:14 UTC

[2/2] git commit: S4-108 Share Zookeeper connection within a node - build the connection through a provider, use singleton scope - updated some javadoc for S4-114

Updated Branches:
  refs/heads/S4-114 [created] d6decd05f


S4-108 Share Zookeeper connection within a node
- build the connection through a provider, use singleton scope
- updated some javadoc for S4-114


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d6decd05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d6decd05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d6decd05

Branch: refs/heads/S4-114
Commit: d6decd05f323d0e495958c23c5b856c8dc120766
Parents: eb85153
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Jan 24 18:10:50 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jan 24 18:47:35 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/DefaultCommModule.java |    1 -
 .../apache/s4/comm/tcp/DefaultRemoteEmitters.java  |    8 +--
 .../org/apache/s4/comm/tcp/RemoteEmitters.java     |    4 ++
 .../apache/s4/comm/topology/AssignmentFromZK.java  |   14 ++---
 .../org/apache/s4/comm/topology/ClusterFromZK.java |   13 ++---
 .../apache/s4/comm/topology/ClustersFromZK.java    |   15 ++---
 .../org/apache/s4/comm/topology/RemoteStreams.java |   13 +++++
 .../apache/s4/comm/topology/ZkRemoteStreams.java   |   39 ++++++---------
 .../s4/comm/topology/AssignmentsFromZKTest1.java   |    4 +-
 .../s4/comm/topology/ClustersFromZKTest.java       |   10 +++-
 .../org/apache/s4/fixtures/TestCommModule.java     |    7 ++-
 .../main/java/org/apache/s4/core/BaseModule.java   |    5 ++
 .../org/apache/s4/core/DefaultRemoteSenders.java   |    9 +--
 .../java/org/apache/s4/core/RemoteSenders.java     |    5 ++
 .../java/org/apache/s4/core/ZkClientProvider.java  |   37 ++++++++++++++
 .../s4/fixtures/AssignmentFromZKNoFailFast.java    |    7 +--
 .../s4/fixtures/ClusterFromZKNoFailFast.java       |    7 +--
 .../s4/fixtures/ClustersFromZKNoFailFast.java      |    7 +--
 18 files changed, 125 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index fb0fce3..c557413 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -115,7 +115,6 @@ public class DefaultCommModule extends AbstractModule {
         }
     }
 
-    @SuppressWarnings("serial")
     private void loadProperties(Binder binder) {
         try {
             config = new PropertiesConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
index edd4fab..151ad85 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
@@ -28,10 +28,6 @@ import org.apache.s4.comm.topology.Cluster;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-/**
- * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
- * 
- */
 @Singleton
 public class DefaultRemoteEmitters implements RemoteEmitters {
 
@@ -40,7 +36,9 @@ public class DefaultRemoteEmitters implements RemoteEmitters {
     @Inject
     RemoteEmitterFactory emitterFactory;
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.tcp.RemoteEmitters#getEmitter(org.apache.s4.comm.topology.Cluster)
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index f298805..5cfd3d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -3,6 +3,10 @@ package org.apache.s4.comm.tcp;
 import org.apache.s4.base.RemoteEmitter;
 import org.apache.s4.comm.topology.Cluster;
 
+/**
+ * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
+ * 
+ */
 public interface RemoteEmitters {
 
     public abstract RemoteEmitter getEmitter(Cluster topology);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..78d4f69 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@ import com.google.inject.name.Named;
 
 /**
  * Handles partition assignment through Zookeeper.
- *
+ * 
  */
 @Singleton
 public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
@@ -85,15 +85,13 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
      * Holds the reference to ClusterNode which points to the current partition owned
      */
     AtomicReference<ClusterNode> clusterNodeRef;
-    private int connectionTimeout;
-    private String clusterName;
+    private final int connectionTimeout;
+    private final String clusterName;
 
     // TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
     @Inject
     public AssignmentFromZK(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
         this.clusterName = clusterName;
         this.connectionTimeout = connectionTimeout;
         taskPath = "/s4/clusters/" + clusterName + "/tasks";
@@ -110,9 +108,7 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
             machineId = "UNKNOWN";
         }
 
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
     }
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 6bda6b4..45e0ae4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@ import com.google.inject.name.Named;
 /**
  * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
  * configuration.
- *
+ * 
  */
 public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
 
@@ -53,23 +52,19 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
     private final String taskPath;
     private final String processPath;
     private final Lock lock;
-    private String clusterName;
+    private final String clusterName;
 
     /**
      * only the local topology
      */
     @Inject
     public ClusterFromZK(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
         this.clusterName = clusterName;
         this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
         this.processPath = "/s4/clusters/" + clusterName + "/process";
         lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
         zkClient.subscribeStateChanges(this);
         if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
             throw new Exception("cannot connect to zookeeper");

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index 86f6bb5..b9fde5e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -46,21 +46,17 @@ public class ClustersFromZK implements Clusters, IZkStateListener {
     private final ZkClient zkClient;
     private final Lock lock;
     private String machineId;
-    private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
-    private int connectionTimeout;
-    private String clusterName;
+    private final Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+    private final int connectionTimeout;
+    private final String clusterName;
 
     @Inject
     public ClustersFromZK(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
         this.clusterName = clusterName;
         this.connectionTimeout = connectionTimeout;
         lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
         zkClient.subscribeStateChanges(this);
         zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
         try {
@@ -115,6 +111,7 @@ public class ClustersFromZK implements Clusters, IZkStateListener {
         doProcess();
     }
 
+    @Override
     public Cluster getCluster(String clusterName) {
         return clusters.get(clusterName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index f9cdcd5..9b9be90 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -2,6 +2,19 @@ package org.apache.s4.comm.topology;
 
 import java.util.Set;
 
+/**
+ * <p>
+ * Monitors streams available in the S4 cluster.
+ * </p>
+ * <p>
+ * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
+ * </p>
+ * <p>
+ * Provides methods to publish producers and consumers of streams
+ * </p>
+ * 
+ */
+
 public interface RemoteStreams {
 
     public abstract Set<StreamConsumer> getConsumers(String streamName);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
index f4a9623..0855fc0 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,18 +38,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 
-/**
- * <p>
- * Monitors streams available in the S4 cluster.
- * </p>
- * <p>
- * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
- * </p>
- * <p>
- * Provides methods to publish producers and consumers of streams
- * </p>
- * 
- */
 @Singleton
 public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreams {
 
@@ -60,7 +47,7 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
     private final Lock lock;
     private final static String STREAMS_PATH = "/s4/streams";
     // by stream name, then "producer"|"consumer" then
-    private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+    private final Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
 
     public enum StreamType {
         PRODUCER, CONSUMER;
@@ -89,14 +76,11 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
     }
 
     @Inject
-    public ZkRemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+    public ZkRemoteStreams(@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient)
+            throws Exception {
 
         lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
         zkClient.subscribeStateChanges(this);
         zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
         // bug in zkClient, it does not invoke handleNewSession the first time
@@ -107,7 +91,9 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
 
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.RemoteStreams#getConsumers(java.lang.String)
      */
     @Override
@@ -186,8 +172,11 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
         streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String, java.lang.String)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String,
+     * java.lang.String)
      */
     @Override
     public void addOutputStream(String appId, String clusterName, String streamName) {
@@ -219,7 +208,9 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
         zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index 8b1ff9a..472e912 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -64,7 +64,9 @@ public class AssignmentsFromZKTest1 extends ZkBasedTest {
                     try {
 
                         for (String topologyName : names) {
-                            assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+                            zkClient.setZkSerializer(new ZNRecordSerializer());
+                            assignmentFromZK = new AssignmentFromZK(topologyName, 30000, zkClient);
                             assignmentFromZK.init();
                             ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
                             latch.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index f17bf26..8f692bc 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -56,11 +56,13 @@ public class ClustersFromZKTest extends ZkBasedTest {
             InterruptedException {
         final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
         taskSetup.clean("s4");
+
         for (String clusterName : clusterNames) {
             taskSetup.setup(clusterName, 10, 1300);
         }
-
-        final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+        ZkClient zkClient1 = new ZkClient(CommTestUtils.ZK_STRING);
+        zkClient1.setZkSerializer(new ZNRecordSerializer());
+        final ClustersFromZK clusterFromZK = new ClustersFromZK(null, 30000, zkClient1);
 
         final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
         for (final String clusterName : clusterNames) {
@@ -87,7 +89,9 @@ public class ClustersFromZKTest extends ZkBasedTest {
                     AssignmentFromZK assignmentFromZK;
                     try {
                         for (String clusterName : clusterNames) {
-                            assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+                            zkClient.setZkSerializer(new ZNRecordSerializer());
+                            assignmentFromZK = new AssignmentFromZK(clusterName, 30000, zkClient);
                             assignmentFromZK.init();
                             ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
                             latch.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index c1fb253..827ec42 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -5,6 +5,8 @@ import java.io.InputStream;
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
 
@@ -31,8 +33,11 @@ public class TestCommModule extends DefaultCommModule {
         bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
         bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
         bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
-        // bind(Cluster.class).to(ClusterFromZK.class);
 
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+
+        ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        bind(ZkClient.class).toInstance(zkClient);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index df2d8f1..582f5d0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -8,6 +8,7 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
 import org.slf4j.Logger;
@@ -15,6 +16,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.Scopes;
 import com.google.inject.name.Names;
 
 public class BaseModule extends AbstractModule {
@@ -45,6 +47,9 @@ public class BaseModule extends AbstractModule {
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
         bind(S4Bootstrap.class);
 
+        // share the Zookeeper connection
+        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+
     }
 
     @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index b9c6fd7..6aaa8f1 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -37,11 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 
-/**
- * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
- * the event.
- * 
- */
 public class DefaultRemoteSenders implements RemoteSenders {
 
     Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
@@ -73,7 +68,9 @@ public class DefaultRemoteSenders implements RemoteSenders {
         serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.core.RemoteSenders#send(java.lang.String, org.apache.s4.base.Event)
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 62d62f3..9222450 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -2,6 +2,11 @@ package org.apache.s4.core;
 
 import org.apache.s4.base.Event;
 
+/**
+ * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
+ * the event.
+ * 
+ */
 public interface RemoteSenders {
 
     public abstract void send(String hashKey, Event event);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
new file mode 100644
index 0000000..19c4ecc
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
@@ -0,0 +1,37 @@
+package org.apache.s4.core;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+
+/**
+ * 
+ * Provides a connection to ZooKeeper through the {@link ZkClient} class.
+ * <p>
+ * This connection can easily be shared by specifying singleton scope at binding time (i.e. when binding the ZkClient
+ * class, see {@link BaseModule}).
+ * 
+ * 
+ */
+public class ZkClientProvider implements Provider<ZkClient> {
+
+    private final ZkClient zkClient;
+
+    @Inject
+    public ZkClientProvider(@Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) {
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+    }
+
+    @Override
+    public ZkClient get() {
+        return zkClient;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
index e8b7e87..bcf88c4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
@@ -19,6 +19,7 @@
 package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 import com.google.inject.Inject;
@@ -28,10 +29,8 @@ public class AssignmentFromZKNoFailFast extends AssignmentFromZK {
 
     @Inject
     public AssignmentFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+        super(clusterName, connectionTimeout, zkClient);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
index 025fb75..0b48bbf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
@@ -19,6 +19,7 @@
 package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 import com.google.inject.Inject;
@@ -28,10 +29,8 @@ public class ClusterFromZKNoFailFast extends ClusterFromZK {
 
     @Inject
     public ClusterFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+        super(clusterName, connectionTimeout, zkClient);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
index 4ea5644..0038b21 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -19,6 +19,7 @@
 package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 import com.google.inject.Inject;
@@ -28,10 +29,8 @@ public class ClustersFromZKNoFailFast extends ClustersFromZK {
 
     @Inject
     public ClustersFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+        super(clusterName, connectionTimeout, zkClient);
     }
 
     @Override