You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/01/24 17:53:14 UTC
[2/2] git commit: S4-108 Share Zookeeper connection within a node -
build the connection through a provider,
use singleton scope - updated some javadoc for S4-114
Updated Branches:
refs/heads/S4-114 [created] d6decd05f
S4-108 Share Zookeeper connection within a node
- build the connection through a provider, use singleton scope
- updated some javadoc for S4-114
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d6decd05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d6decd05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d6decd05
Branch: refs/heads/S4-114
Commit: d6decd05f323d0e495958c23c5b856c8dc120766
Parents: eb85153
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Jan 24 18:10:50 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jan 24 18:47:35 2013 +0100
----------------------------------------------------------------------
.../java/org/apache/s4/comm/DefaultCommModule.java | 1 -
.../apache/s4/comm/tcp/DefaultRemoteEmitters.java | 8 +--
.../org/apache/s4/comm/tcp/RemoteEmitters.java | 4 ++
.../apache/s4/comm/topology/AssignmentFromZK.java | 14 ++---
.../org/apache/s4/comm/topology/ClusterFromZK.java | 13 ++---
.../apache/s4/comm/topology/ClustersFromZK.java | 15 ++---
.../org/apache/s4/comm/topology/RemoteStreams.java | 13 +++++
.../apache/s4/comm/topology/ZkRemoteStreams.java | 39 ++++++---------
.../s4/comm/topology/AssignmentsFromZKTest1.java | 4 +-
.../s4/comm/topology/ClustersFromZKTest.java | 10 +++-
.../org/apache/s4/fixtures/TestCommModule.java | 7 ++-
.../main/java/org/apache/s4/core/BaseModule.java | 5 ++
.../org/apache/s4/core/DefaultRemoteSenders.java | 9 +--
.../java/org/apache/s4/core/RemoteSenders.java | 5 ++
.../java/org/apache/s4/core/ZkClientProvider.java | 37 ++++++++++++++
.../s4/fixtures/AssignmentFromZKNoFailFast.java | 7 +--
.../s4/fixtures/ClusterFromZKNoFailFast.java | 7 +--
.../s4/fixtures/ClustersFromZKNoFailFast.java | 7 +--
18 files changed, 125 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index fb0fce3..c557413 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -115,7 +115,6 @@ public class DefaultCommModule extends AbstractModule {
}
}
- @SuppressWarnings("serial")
private void loadProperties(Binder binder) {
try {
config = new PropertiesConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
index edd4fab..151ad85 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
@@ -28,10 +28,6 @@ import org.apache.s4.comm.topology.Cluster;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-/**
- * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
- *
- */
@Singleton
public class DefaultRemoteEmitters implements RemoteEmitters {
@@ -40,7 +36,9 @@ public class DefaultRemoteEmitters implements RemoteEmitters {
@Inject
RemoteEmitterFactory emitterFactory;
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.tcp.RemoteEmitters#getEmitter(org.apache.s4.comm.topology.Cluster)
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index f298805..5cfd3d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -3,6 +3,10 @@ package org.apache.s4.comm.tcp;
import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.comm.topology.Cluster;
+/**
+ * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
+ *
+ */
public interface RemoteEmitters {
public abstract RemoteEmitter getEmitter(Cluster topology);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..78d4f69 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@ import com.google.inject.name.Named;
/**
* Handles partition assignment through Zookeeper.
- *
+ *
*/
@Singleton
public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
@@ -85,15 +85,13 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
* Holds the reference to ClusterNode which points to the current partition owned
*/
AtomicReference<ClusterNode> clusterNodeRef;
- private int connectionTimeout;
- private String clusterName;
+ private final int connectionTimeout;
+ private final String clusterName;
// TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
@Inject
public AssignmentFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
taskPath = "/s4/clusters/" + clusterName + "/tasks";
@@ -110,9 +108,7 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
machineId = "UNKNOWN";
}
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
}
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 6bda6b4..45e0ae4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@ import com.google.inject.name.Named;
/**
* Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
* configuration.
- *
+ *
*/
public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
@@ -53,23 +52,19 @@ public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener
private final String taskPath;
private final String processPath;
private final Lock lock;
- private String clusterName;
+ private final String clusterName;
/**
* only the local topology
*/
@Inject
public ClusterFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
this.processPath = "/s4/clusters/" + clusterName + "/process";
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
throw new Exception("cannot connect to zookeeper");
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index 86f6bb5..b9fde5e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -46,21 +46,17 @@ public class ClustersFromZK implements Clusters, IZkStateListener {
private final ZkClient zkClient;
private final Lock lock;
private String machineId;
- private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
- private int connectionTimeout;
- private String clusterName;
+ private final Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+ private final int connectionTimeout;
+ private final String clusterName;
@Inject
public ClustersFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
try {
@@ -115,6 +111,7 @@ public class ClustersFromZK implements Clusters, IZkStateListener {
doProcess();
}
+ @Override
public Cluster getCluster(String clusterName) {
return clusters.get(clusterName);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index f9cdcd5..9b9be90 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -2,6 +2,19 @@ package org.apache.s4.comm.topology;
import java.util.Set;
+/**
+ * <p>
+ * Monitors streams available in the S4 cluster.
+ * </p>
+ * <p>
+ * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
+ * </p>
+ * <p>
+ * Provides methods to publish producers and consumers of streams
+ * </p>
+ *
+ */
+
public interface RemoteStreams {
public abstract Set<StreamConsumer> getConsumers(String streamName);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
index f4a9623..0855fc0 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,18 +38,6 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
-/**
- * <p>
- * Monitors streams available in the S4 cluster.
- * </p>
- * <p>
- * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
- * </p>
- * <p>
- * Provides methods to publish producers and consumers of streams
- * </p>
- *
- */
@Singleton
public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreams {
@@ -60,7 +47,7 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
private final Lock lock;
private final static String STREAMS_PATH = "/s4/streams";
// by stream name, then "producer"|"consumer" then
- private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+ private final Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
public enum StreamType {
PRODUCER, CONSUMER;
@@ -89,14 +76,11 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
}
@Inject
- public ZkRemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ public ZkRemoteStreams(@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient)
+ throws Exception {
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
// bug in zkClient, it does not invoke handleNewSession the first time
@@ -107,7 +91,9 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.RemoteStreams#getConsumers(java.lang.String)
*/
@Override
@@ -186,8 +172,11 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
}
- /* (non-Javadoc)
- * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String, java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String,
+ * java.lang.String)
*/
@Override
public void addOutputStream(String appId, String clusterName, String streamName) {
@@ -219,7 +208,9 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index 8b1ff9a..472e912 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -64,7 +64,9 @@ public class AssignmentsFromZKTest1 extends ZkBasedTest {
try {
for (String topologyName : names) {
- assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ assignmentFromZK = new AssignmentFromZK(topologyName, 30000, zkClient);
assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index f17bf26..8f692bc 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -56,11 +56,13 @@ public class ClustersFromZKTest extends ZkBasedTest {
InterruptedException {
final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
taskSetup.clean("s4");
+
for (String clusterName : clusterNames) {
taskSetup.setup(clusterName, 10, 1300);
}
-
- final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient1 = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient1.setZkSerializer(new ZNRecordSerializer());
+ final ClustersFromZK clusterFromZK = new ClustersFromZK(null, 30000, zkClient1);
final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
for (final String clusterName : clusterNames) {
@@ -87,7 +89,9 @@ public class ClustersFromZKTest extends ZkBasedTest {
AssignmentFromZK assignmentFromZK;
try {
for (String clusterName : clusterNames) {
- assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ assignmentFromZK = new AssignmentFromZK(clusterName, 30000, zkClient);
assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index c1fb253..827ec42 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -5,6 +5,8 @@ import java.io.InputStream;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
@@ -31,8 +33,11 @@ public class TestCommModule extends DefaultCommModule {
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
- // bind(Cluster.class).to(ClusterFromZK.class);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ bind(ZkClient.class).toInstance(zkClient);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index df2d8f1..582f5d0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -8,6 +8,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
import org.slf4j.Logger;
@@ -15,6 +16,7 @@ import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.Scopes;
import com.google.inject.name.Names;
public class BaseModule extends AbstractModule {
@@ -45,6 +47,9 @@ public class BaseModule extends AbstractModule {
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(S4Bootstrap.class);
+ // share the Zookeeper connection
+ bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index b9c6fd7..6aaa8f1 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -37,11 +37,6 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-/**
- * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
- * the event.
- *
- */
public class DefaultRemoteSenders implements RemoteSenders {
Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
@@ -73,7 +68,9 @@ public class DefaultRemoteSenders implements RemoteSenders {
serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.core.RemoteSenders#send(java.lang.String, org.apache.s4.base.Event)
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 62d62f3..9222450 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -2,6 +2,11 @@ package org.apache.s4.core;
import org.apache.s4.base.Event;
+/**
+ * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
+ * the event.
+ *
+ */
public interface RemoteSenders {
public abstract void send(String hashKey, Event event);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
new file mode 100644
index 0000000..19c4ecc
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
@@ -0,0 +1,37 @@
+package org.apache.s4.core;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+
+/**
+ *
+ * Provides a connection to ZooKeeper through the {@link ZkClient} class.
+ * <p>
+ * This connection can easily be shared by specifying singleton scope at binding time (i.e. when binding the ZkClient
+ * class, see {@link BaseModule}).
+ *
+ *
+ */
+public class ZkClientProvider implements Provider<ZkClient> {
+
+ private final ZkClient zkClient;
+
+ @Inject
+ public ZkClientProvider(@Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) {
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+ }
+
+ @Override
+ public ZkClient get() {
+ return zkClient;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
index e8b7e87..bcf88c4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
@@ -19,6 +19,7 @@
package org.apache.s4.fixtures;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.google.inject.Inject;
@@ -28,10 +29,8 @@ public class AssignmentFromZKNoFailFast extends AssignmentFromZK {
@Inject
public AssignmentFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+ super(clusterName, connectionTimeout, zkClient);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
index 025fb75..0b48bbf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
@@ -19,6 +19,7 @@
package org.apache.s4.fixtures;
import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.google.inject.Inject;
@@ -28,10 +29,8 @@ public class ClusterFromZKNoFailFast extends ClusterFromZK {
@Inject
public ClusterFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+ super(clusterName, connectionTimeout, zkClient);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d6decd05/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
index 4ea5644..0038b21 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -19,6 +19,7 @@
package org.apache.s4.fixtures;
import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.google.inject.Inject;
@@ -28,10 +29,8 @@ public class ClustersFromZKNoFailFast extends ClustersFromZK {
@Inject
public ClustersFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+ super(clusterName, connectionTimeout, zkClient);
}
@Override