You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/05/24 18:19:26 UTC
[4/5] inter-app communications + refactorings
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
new file mode 100644
index 0000000..6b5d094
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -0,0 +1,211 @@
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Monitors streams available in the S4 cluster.
+ * </p>
+ * <p>
+ * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
+ * </p>
+ * <p>
+ * Provides methods to publish producers and consumers of streams
+ * </p>
+ *
+ */
+@Singleton
+public class RemoteStreams implements IZkStateListener, IZkChildListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+ private KeeperState state;
+ private final ZkClient zkClient;
+ private final Lock lock;
+ private final static String STREAMS_PATH = "/s4/streams";
+ // by stream name, then "producer"|"consumer" then
+ private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+
+ public enum StreamType {
+ PRODUCER, CONSUMER;
+
+ public String getPath(String streamName) {
+ switch (this) {
+ case PRODUCER:
+ return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+ case CONSUMER:
+ return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+ default:
+ throw new RuntimeException("Invalid path in enum StreamType");
+ }
+ }
+
+ public String getCollectionName() {
+ switch (this) {
+ case PRODUCER:
+ return "producers";
+ case CONSUMER:
+ return "consumers";
+ default:
+ throw new RuntimeException("Invalid path in enum StreamType");
+ }
+ }
+ }
+
+ @Inject
+ public RemoteStreams(@Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+
+ lock = new ReentrantLock();
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+ zkClient.subscribeStateChanges(this);
+ zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+ // bug in zkClient, it does not invoke handleNewSession the first time
+ // it connects
+ this.handleStateChanged(KeeperState.SyncConnected);
+
+ this.handleNewSession();
+
+ }
+
+ public Set<StreamConsumer> getConsumers(String streamName) {
+ if (!streams.containsKey(streamName)) {
+ return Collections.emptySet();
+ } else {
+ return streams.get(streamName).get("consumers");
+ }
+ }
+
+ /**
+ * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
+ */
+ private void doProcess() {
+ lock.lock();
+ try {
+ refreshStreams();
+ } catch (Exception e) {
+ logger.warn("Exception in tryToAcquireTask", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ this.state = state;
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ logger.info("New session:" + zkClient.getSessionId());
+ zkClient.subscribeChildChanges(STREAMS_PATH, this);
+
+ doProcess();
+ }
+
+ @Override
+ public void handleChildChange(String paramString, List<String> paramList) throws Exception {
+ doProcess();
+ }
+
+ private void refreshStreams() {
+ List<String> children = zkClient.getChildren(STREAMS_PATH);
+ for (String streamName : children) {
+ if (!streams.containsKey(streamName)) {
+ logger.info("Detected new stream [{}]", streamName);
+ streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+ zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
+ zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
+ streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+ }
+
+ update(streamName, StreamType.PRODUCER);
+ update(streamName, StreamType.CONSUMER);
+ }
+ }
+
+ private void update(String streamName, StreamType type) {
+ List<String> elements = zkClient.getChildren(type.getPath(streamName));
+ Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
+ for (String element : elements) {
+ ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
+ if (producerData != null) {
+ StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
+ producerData.getSimpleField("clusterName"));
+ consumers.add(consumer);
+ }
+ }
+ streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
+ }
+
+ public void addOutputStream(String appId, String clusterName, String streamName) {
+ lock.lock();
+ try {
+ logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
+ clusterName });
+ createStreamPaths(streamName);
+ ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+ producer.putSimpleField("appId", appId);
+ producer.putSimpleField("clusterName", clusterName);
+ try {
+ zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
+ } catch (Throwable e) {
+ logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+ new String[] { streamName, appId, clusterName, e.getMessage() });
+ }
+ refreshStreams();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
+ */
+ private void createStreamPaths(String streamName) {
+ zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
+ zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
+ }
+
+ public void addInputStream(int appId, String clusterName, String streamName) {
+ lock.lock();
+ try {
+ logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
+ new String[] { streamName, String.valueOf(appId), clusterName });
+ createStreamPaths(streamName);
+ ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+ consumer.putSimpleField("appId", String.valueOf(appId));
+ consumer.putSimpleField("clusterName", clusterName);
+ try {
+ zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
+ } catch (Throwable e) {
+ logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+ new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+ }
+ refreshStreams();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
deleted file mode 100644
index 8f7cc1a..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.s4.comm.topology;
-
-public interface RemoteTopology extends Topology {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
deleted file mode 100644
index 39ede90..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-public class RemoteTopologyFromZK extends TopologyFromZK implements RemoteTopology {
-
- @Inject
- public RemoteTopologyFromZK(@Named("cluster.remote.name") String remoteClusterName,
- @Named("cluster.zk_address") String zookeeperAddress,
- @Named("cluster.zk_session_timeout") int sessionTimeout,
- @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(remoteClusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
new file mode 100644
index 0000000..f21213a
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -0,0 +1,27 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * A subscriber to a published stream. Identified through its cluster name (for dispatching to the remote cluster) and
+ * application ID (for dispatching within a node).
+ *
+ */
+public class StreamConsumer {
+
+ int appId;
+ String clusterName;
+
+ public StreamConsumer(int appId, String clusterName) {
+ super();
+ this.appId = appId;
+ this.clusterName = clusterName;
+ }
+
+ public int getAppId() {
+ return appId;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java
deleted file mode 100644
index 584f3da..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Topology.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import org.apache.s4.comm.topology.Cluster;
-
-public interface Topology {
- public Cluster getTopology();
- public void addListener(TopologyChangeListener listener);
- public void removeListener(TopologyChangeListener listener);
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java
deleted file mode 100644
index d1ac45b..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyChangeListener.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.s4.comm.topology;
-
-public interface TopologyChangeListener {
- public void onChange();
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java
deleted file mode 100644
index b28a170..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromFile.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import com.google.inject.Inject;
-
-public class TopologyFromFile implements Topology {
-
- private Cluster cluster;
-
- @Inject
- public TopologyFromFile(Cluster cluster) {
- super();
- this.cluster = cluster;
-
- }
-
- @Override
- public Cluster getTopology() {
- return cluster;
- }
-
- @Override
- public void addListener(TopologyChangeListener listener) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void removeListener(TopologyChangeListener listener) {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
deleted file mode 100644
index 0be6686..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/TopologyFromZK.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-public class TopologyFromZK implements Topology, IZkChildListener, IZkStateListener, IZkDataListener {
- private static final Logger logger = LoggerFactory.getLogger(TopologyFromZK.class);
- private final String clusterName;
- private final AtomicReference<Cluster> clusterRef;
- private final List<TopologyChangeListener> listeners;
- private KeeperState state;
- private final ZkClient zkClient;
- private final String taskPath;
- private final String processPath;
- private final Lock lock;
- private AtomicBoolean currentlyOwningTask;
- private String machineId;
-
- @Inject
- public TopologyFromZK(@Named("cluster.name") String clusterName,
- @Named("cluster.zk_address") String zookeeperAddress,
- @Named("cluster.zk_session_timeout") int sessionTimeout,
- @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- this.clusterName = clusterName;
- taskPath = "/" + clusterName + "/" + "tasks";
- processPath = "/" + clusterName + "/" + "process";
- lock = new ReentrantLock();
- clusterRef = new AtomicReference<Cluster>();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
- listeners = new ArrayList<TopologyChangeListener>();
- zkClient.subscribeStateChanges(this);
- zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
- try {
- machineId = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- logger.warn("Unable to get hostname", e);
- machineId = "UNKNOWN";
- }
- // bug in zkClient, it does not invoke handleNewSession the first time
- // it connects
- this.handleStateChanged(KeeperState.SyncConnected);
- this.handleNewSession();
- }
-
- @Override
- public Cluster getTopology() {
- return clusterRef.get();
- }
-
- @Override
- public void addListener(TopologyChangeListener listener) {
- logger.info("Adding topology change listener:" + listener);
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(TopologyChangeListener listener) {
- logger.info("Removing topology change listener:" + listener);
- listeners.remove(listener);
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- doProcess();
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- doProcess();
- }
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- this.state = state;
- }
-
- @Override
- public void handleNewSession() throws Exception {
- logger.info("New session:" + zkClient.getSessionId());
- zkClient.subscribeChildChanges(taskPath, this);
- zkClient.subscribeChildChanges(processPath, this);
- doProcess();
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
- doProcess();
- }
-
- private void doProcess() {
- lock.lock();
- try {
- refreshTopology();
- } catch (Exception e) {
- logger.error("", e);
- } finally {
- lock.unlock();
- }
- }
-
- private void refreshTopology() throws Exception {
- List<String> processes = zkClient.getChildren(processPath);
- List<String> tasks = zkClient.getChildren(taskPath);
- Cluster cluster = new Cluster(tasks.size());
- for (int i = 0; i < processes.size(); i++) {
- ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
- if (process != null) {
- int partition = Integer.parseInt(process.getSimpleField("partition"));
- String host = process.getSimpleField("host");
- int port = Integer.parseInt(process.getSimpleField("port"));
- String taskId = process.getSimpleField("taskId");
- ClusterNode node = new ClusterNode(partition, port, host, taskId);
- cluster.addNode(node);
- }
- }
- logger.info("Changing cluster topology to " + cluster + " from " + clusterRef.get());
- clusterRef.set(cluster);
- // Notify all changeListeners about the topology change
- for (TopologyChangeListener listener : listeners) {
- listener.onChange();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
index decce25..4aa7639 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
@@ -4,6 +4,10 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+/**
+ * ZooKeeper's custom znode data structure. Allows for easily information addition and retrieval.
+ *
+ */
public class ZNRecord {
String id;
@@ -16,7 +20,7 @@ public class ZNRecord {
Map<String, List<String>> listFields;
Map<String, Map<String, String>> mapFields;
- public ZNRecord() {
+ private ZNRecord() {
}
@@ -59,6 +63,17 @@ public class ZNRecord {
}
@Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((id == null) ? 0 : id.hashCode());
+ result = prime * result + ((listFields == null) ? 0 : listFields.hashCode());
+ result = prime * result + ((mapFields == null) ? 0 : mapFields.hashCode());
+ result = prime * result + ((simpleFields == null) ? 0 : simpleFields.hashCode());
+ return result;
+ }
+
+ @Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 9882241..8fab1a6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -12,18 +12,18 @@ import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
-public class UDPEmitter implements Emitter, TopologyChangeListener {
+public class UDPEmitter implements Emitter, ClusterChangeListener {
private DatagramSocket socket;
private final HashBiMap<Integer, ClusterNode> nodes;
private final Map<Integer, InetAddress> inetCache = new HashMap<Integer, InetAddress>();
private final long messageDropInQueueCount = 0;
- private final Topology topology;
+ private final Cluster topology;
@Inject
SerializerDeserializer serDeser;
@@ -33,11 +33,11 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
}
@Inject
- public UDPEmitter(Topology topology) {
+ public UDPEmitter(Cluster topology) {
this.topology = topology;
topology.addListener(this);
- nodes = HashBiMap.create(topology.getTopology().getNodes().size());
- for (ClusterNode node : topology.getTopology().getNodes()) {
+ nodes = HashBiMap.create(topology.getPhysicalCluster().getNodes().size());
+ for (ClusterNode node : topology.getPhysicalCluster().getNodes()) {
nodes.forcePut(node.getPartition(), node);
}
@@ -74,14 +74,14 @@ public class UDPEmitter implements Emitter, TopologyChangeListener {
@Override
public int getPartitionCount() {
- return topology.getTopology().getPartitionCount();
+ return topology.getPhysicalCluster().getPartitionCount();
}
@Override
public void onChange() {
// topology changes when processes pick tasks
synchronized (nodes) {
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+ for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
nodes.put(partition, clusterNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
index e759f25..abbc31b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java
@@ -64,32 +64,29 @@ public class DeliveryTestUtil {
/*
* TimerThread - interrupts the passed thread, after specified time-interval.
+ *
+ * FIXME we should use number of events rather than time-based interval
*/
class TimerThread extends Thread {
private final Thread watchThread;
- private Integer sleepCounter;
+ private volatile int sleepCounter;
TimerThread(Thread watchThread) {
this.watchThread = watchThread;
- this.sleepCounter = new Integer(sleepCount);
+ this.sleepCounter = sleepCount;
}
public void resetSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = sleepCount;
- }
+ this.sleepCounter = sleepCount;
}
public void clearSleepCounter() {
- synchronized (this.sleepCounter) {
- this.sleepCounter = 0;
- }
+ this.sleepCounter = 0;
}
private int getCounter() {
- synchronized (this.sleepCounter) {
- return this.sleepCounter--;
- }
+ return sleepCounter--;
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index f02bf64..cb39c1b 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -1,9 +1,8 @@
package org.apache.s4.comm.tcp;
import java.io.IOException;
+
import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
import org.apache.s4.fixtures.ZkBasedClusterManagementTestModule;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
@@ -26,7 +25,7 @@ public class TCPCommTest extends ZkBasedTest {
class TCPCommTestModule extends ZkBasedClusterManagementTestModule {
TCPCommTestModule() {
- super(TCPEmitter.class, TCPListener.class);
+ super(TCPEmitter.class, TCPRemoteEmitter.class, TCPListener.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
deleted file mode 100644
index e709d68..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.junit.Test;
-
-public class AssignmentFromZKTest extends ZKBaseTest {
-
- @Test
- public void testAssignment() throws Exception {
- TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
- final String clusterName = "test-s4-cluster";
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10, 1300);
- final CountDownLatch latch = new CountDownLatch(10);
- for (int i = 0; i < 10; i++) {
- Runnable runnable = new Runnable() {
-
- @SuppressWarnings("unused")
- @Override
- public void run() {
- AssignmentFromZK assignmentFromZK;
- try {
- assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
- latch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- Thread t = new Thread(runnable);
- t.start();
- }
-
- boolean await = latch.await(3, TimeUnit.SECONDS);
- assertEquals(true, await);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
new file mode 100644
index 0000000..71a3489
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
@@ -0,0 +1,69 @@
+package org.apache.s4.comm.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+
+public class AssignmentsFromZKTest extends ZKBaseTest {
+
+ @Test
+ @Ignore
+ public void testAssignmentFor1Cluster() throws Exception {
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String topologyNames = "cluster1";
+ testAssignment(taskSetup, topologyNames);
+ }
+
+ @Test
+ public void testAssignmentFor2Clusters() throws Exception {
+ Thread.sleep(2000);
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String topologyNames = "cluster2, cluster3";
+ testAssignment(taskSetup, topologyNames);
+ }
+
+ private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+ final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
+ taskSetup.clean("s4");
+ for (String topologyName : names) {
+ taskSetup.setup(topologyName, 10, 1300);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(10 * names.size());
+ for (int i = 0; i < 10; i++) {
+ Runnable runnable = new Runnable() {
+
+ @SuppressWarnings("unused")
+ @Override
+ public void run() {
+ AssignmentFromZK assignmentFromZK;
+ try {
+
+ for (String topologyName : names) {
+ assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+ latch.countDown();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ boolean await = latch.await(30, TimeUnit.SECONDS);
+ assertEquals(true, await);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
new file mode 100644
index 0000000..38d7621
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -0,0 +1,97 @@
+package org.apache.s4.comm.topology;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+
+public class ClustersFromZKTest extends ZKBaseTest {
+
+ @Test
+ @Ignore
+ public void testAssignmentFor1Topology() throws InterruptedException, Exception {
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String clustersString = "cluster1";
+ testAssignment(taskSetup, clustersString);
+ }
+
+ @Test
+ public void testAssignmentFor2Topologies() throws Exception {
+ Thread.sleep(2000);
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String clustersString = "cluster2, cluster3";
+ testAssignment(taskSetup, clustersString);
+
+ }
+
+ private void testAssignment(TaskSetup taskSetup, final String clustersString) throws Exception,
+ InterruptedException {
+ final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
+ taskSetup.clean("s4");
+ for (String clusterName : clusterNames) {
+ taskSetup.setup(clusterName, 10, 1300);
+ }
+
+ final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+
+ final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
+ for (final String clusterName : clusterNames) {
+ ClusterChangeListener listener = new ClusterChangeListener() {
+
+ @Override
+ public void onChange() {
+ if (clusterFromZK.getCluster(clusterName).getPhysicalCluster().getNodes().size() == 10) {
+ signalAllClustersComplete.countDown();
+ }
+
+ }
+ };
+ clusterFromZK.getCluster(clusterName).addListener(listener);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(10 * clusterNames.size());
+ for (int i = 0; i < 10; i++) {
+ Runnable runnable = new Runnable() {
+
+ @SuppressWarnings("unused")
+ @Override
+ public void run() {
+ AssignmentFromZK assignmentFromZK;
+ try {
+ for (String clusterName : clusterNames) {
+ assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+ latch.countDown();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ boolean await = latch.await(20, TimeUnit.SECONDS);
+ assertEquals(true, await);
+ boolean success = false;
+ success = signalAllClustersComplete.await(20, TimeUnit.SECONDS);
+ assertEquals(true, success);
+ for (String clusterName : clusterNames) {
+ if (!(10 == clusterFromZK.getCluster(clusterName).getPhysicalCluster().getNodes().size())) {
+ // pending zookeeper updates are not yet reflected
+ Thread.sleep(2000);
+ }
+ assertEquals(10, clusterFromZK.getCluster(clusterName).getPhysicalCluster().getNodes().size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
deleted file mode 100644
index 65eee28..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.junit.Test;
-
-public class TopologyFromZKTest extends ZKBaseTest {
-
- @Test
- public void testAssignment() throws Exception {
- TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
- final String clusterName = "test-s4-cluster";
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10, 1300);
-
- final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
- final Lock lock = new ReentrantLock();
- final Condition signal = lock.newCondition();
- TopologyChangeListener listener = new TopologyChangeListener() {
-
- @Override
- public void onChange() {
- System.out.println("TopologyFromZKTest.testAssignment().new TopologyChangeListener() {...}.onChange()");
- if (topologyFromZK.getTopology().getNodes().size() == 10) {
- lock.lock();
- try {
- signal.signalAll();
- } finally {
- lock.unlock();
- }
-
- }
-
- }
- };
- topologyFromZK.addListener(listener);
- final CountDownLatch latch = new CountDownLatch(10);
- for (int i = 0; i < 10; i++) {
- Runnable runnable = new Runnable() {
-
- @SuppressWarnings("unused")
- @Override
- public void run() {
- AssignmentFromZK assignmentFromZK;
- try {
- assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
- latch.countDown();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- };
- Thread t = new Thread(runnable);
- t.start();
- }
-
- boolean await = latch.await(3, TimeUnit.SECONDS);
- assertEquals(true, await);
- boolean success = false;
- lock.lock();
- try {
- success = signal.await(3, TimeUnit.SECONDS);
- } finally {
- lock.unlock();
- }
- assertEquals(true, success);
- assertEquals(10, topologyFromZK.getTopology().getNodes().size());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
index 54ce6a9..8107cde 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java
@@ -1,43 +1,26 @@
package org.apache.s4.comm.topology;
-import java.io.File;
+import java.io.IOException;
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
import org.junit.Before;
public class ZKBaseTest {
- protected ZkServer zkServer = null;
- protected ZkClient zkClient;
- protected String zookeeperAddress;
- @Before
- public void setUp() {
- String dataDir = System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "zookeeper"
- + File.separator + "data";
- String logDir = System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "zookeeper"
- + File.separator + "logs";
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
-
- @Override
- public void createDefaultNameSpace(ZkClient zkClient) {
+ private Factory zkFactory;
- }
- };
- int port = 3029;
- zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
- zkServer.start();
- zkClient = zkServer.getZkClient();
- zookeeperAddress = "localhost:" + port;
+ @Before
+ public void setUp() throws IOException, InterruptedException, KeeperException {
+ CommTestUtils.cleanupTmpDirs();
+ zkFactory = CommTestUtils.startZookeeperServer();
}
@After
public void tearDown() throws Exception {
- if (zkServer != null) {
- zkServer.shutdown();
- }
+ CommTestUtils.stopZookeeperServer(zkFactory);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 808a357..e224b99 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -25,7 +25,7 @@ public class UDPCommTest extends ZkBasedTest {
class UDPCommTestModule extends ZkBasedClusterManagementTestModule {
UDPCommTestModule() {
- super(UDPEmitter.class, UDPListener.class);
+ super(UDPEmitter.class, null, UDPListener.class);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 7d7913d..f7cf147 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -13,6 +13,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -39,6 +40,7 @@ public class CommTestUtils {
private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
public static final int ZK_PORT = 2181;
+ public static final String ZK_STRING = "localhost:" + ZK_PORT;
public static final int INITIAL_BOOKIE_PORT = 5000;
public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
@@ -62,7 +64,7 @@ public class CommTestUtils {
cmdList.add(arg);
}
- // System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+ System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
ProcessBuilder pb = new ProcessBuilder(cmdList);
pb.directory(new File(System.getProperty("user.dir")));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
deleted file mode 100644
index ebcf388..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
- private final Class<?> appClass;
-
- protected FileBasedClusterManagementTestModule() {
- // infer actual app class through "super type tokens" (this simple code
- // assumes actual module class is a direct subclass from this one)
- ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
- Type[] fieldArgTypes = pt.getActualTypeArguments();
- this.appClass = (Class<?>) fieldArgTypes[0];
- }
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
- config = new PropertiesConfiguration();
- config.load(is);
- config.setProperty("cluster.lock_dir",
- config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- bind(appClass);
- bind(Cluster.class);
- bind(Hasher.class).to(DefaultHasher.class);
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Assignment.class).to(AssignmentFromFile.class);
- bind(Topology.class).to(TopologyFromFile.class);
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
index a368db8..a1e7089 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -4,24 +4,29 @@ import java.io.InputStream;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.RemoteEmitterFactory;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.tcp.TCPRemoteEmitter;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromZK;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
public class ZkBasedClusterManagementTestModule extends AbstractModule {
@@ -29,14 +34,16 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
protected PropertiesConfiguration config = null;
private Class<? extends Emitter> emitterClass = null;
+ private Class<? extends RemoteEmitter> remoteEmitterClass = null;
private Class<? extends Listener> listenerClass = null;
protected ZkBasedClusterManagementTestModule() {
}
protected ZkBasedClusterManagementTestModule(Class<? extends Emitter> emitterClass,
- Class<? extends Listener> listenerClass) {
+ Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
this.emitterClass = emitterClass;
+ this.remoteEmitterClass = remoteEmitterClass;
this.listenerClass = listenerClass;
}
@@ -46,11 +53,6 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
config = new PropertiesConfiguration();
config.load(is);
- config.setProperty(
- "cluster.zk_address",
- config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+",
- "localhost:" + CommTestUtils.ZK_PORT));
- System.out.println(ConfigurationUtils.toString(config));
// TODO - validate properties.
/* Make all properties injectable. Do we need this? */
@@ -66,11 +68,23 @@ public class ZkBasedClusterManagementTestModule extends AbstractModule {
if (config == null) {
loadProperties(binder());
}
- bind(Cluster.class);
bind(Hasher.class).to(DefaultHasher.class);
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
bind(Assignment.class).to(AssignmentFromZK.class);
- bind(Topology.class).to(TopologyFromZK.class);
+ bind(Clusters.class).to(ClustersFromZK.class);
+ bind(Cluster.class).to(ClusterFromZK.class);
+
+ // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+ if (this.remoteEmitterClass != null) {
+ install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
+ RemoteEmitterFactory.class));
+ } else {
+ install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
+ RemoteEmitterFactory.class));
+ }
+
+ bind(RemoteEmitters.class);
if (this.emitterClass != null) {
bind(Emitter.class).to(this.emitterClass);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index bda3bd8..2f336e0 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -1,11 +1,10 @@
package org.apache.s4.fixtures;
-import java.io.File;
+import java.io.IOException;
-import org.I0Itec.zkclient.IDefaultNameSpace;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkServer;
import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
@@ -13,46 +12,22 @@ import org.slf4j.LoggerFactory;
public abstract class ZkBasedTest {
private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
- private ZkServer zkServer;
+ private Factory zkFactory;
@Before
- public void prepare() {
- String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
- String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+ public void prepare() throws IOException, InterruptedException, KeeperException {
CommTestUtils.cleanupTmpDirs();
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ zkFactory = CommTestUtils.startZookeeperServer();
- @Override
- public void createDefaultNameSpace(ZkClient zkClient) {
-
- }
- };
-
- logger.info("Starting Zookeeper Server");
- zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
- zkServer.start();
-
- logger.info("Starting Zookeeper Client 1");
- String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
- @SuppressWarnings("unused")
- ZkClient zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
-
- ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
- zkClient2.getCreationTime("/");
-
- TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
- final String clusterName = "s4-test-cluster";
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1, 1300);
+ TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ taskSetup.clean("s4");
+ taskSetup.setup("cluster1", 1, 1300);
}
@After
- public void cleanupZkBasedTest() {
- if (zkServer != null) {
- zkServer.shutdown();
- zkServer = null;
- }
+ public void cleanupZkBasedTest() throws IOException, InterruptedException {
+ CommTestUtils.stopZookeeperServer(zkFactory);
CommTestUtils.cleanupTmpDirs();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/default.s4.properties b/subprojects/s4-comm/src/test/resources/default.s4.properties
index 0e31dfa..6624ff4 100644
--- a/subprojects/s4-comm/src/test/resources/default.s4.properties
+++ b/subprojects/s4-comm/src/test/resources/default.s4.properties
@@ -1,9 +1,6 @@
comm.queue_emmiter_size = 8000
comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = {user.dir}/tmp
-cluster.name = s4-test-cluster
+cluster.name = cluster1
cluster.zk_address = localhost:2181
cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/test/resources/s4-comm-test.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/s4-comm-test.properties b/subprojects/s4-comm/src/test/resources/s4-comm-test.properties
deleted file mode 100644
index 340ee5b..0000000
--- a/subprojects/s4-comm/src/test/resources/s4-comm-test.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = /tmp
-cluster.isCluster = true
-emitter.send.interval = 100
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index afa85c9..46d580d 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -15,15 +15,19 @@
*/
description = 'The S4 core platform.'
-
+
dependencies {
compile project(":s4-base")
compile project(":s4-comm")
compile project(path: ':s4-comm', configuration: 'tests')
compile libraries.jcommander
- testCompile project(path: ':s4-comm', configuration: 'tests')
+ testCompile project(path: ':s4-comm', configuration: 'tests')
+ testCompile libraries.gradle_base_services
+ testCompile libraries.gradle_core
+ testCompile libraries.gradle_tooling_api
+ testCompile libraries.gradle_wrapper
}
test {
- forkEvery=1
+ forkEvery=1;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index d33e718..c6bffd3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -21,9 +21,11 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.RemoteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.name.Named;
/*
* Container base class to hold all processing elements. We will implement administrative methods here.
@@ -50,11 +53,25 @@ public abstract class App {
private ClockType clockType = ClockType.WALL_CLOCK;
private int id = -1;
+
@Inject
private Sender sender;
@Inject
private Receiver receiver;
+ @Inject
+ RemoteSenders remoteSenders;
+
+ @Inject
+ Hasher hasher;
+
+ @Inject
+ RemoteStreams remoteStreams;
+
+ @Inject
+ @Named("cluster.name")
+ String clusterName;
+
// serialization uses the application class loader
private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
@@ -254,9 +271,9 @@ public abstract class App {
* - receives events from the communication layer.
*/
public void setCommLayer(Sender sender, Receiver receiver) {
- this.sender = sender;
- this.receiver = receiver;
- sender.setPartition(receiver.getPartition());
+ // this.sender = sender;
+ // this.receiver = receiver;
+ // sender.setPartition(receiver.getPartition());
}
/**
@@ -298,6 +315,27 @@ public abstract class App {
return new Stream<T>(this, name, processingElements);
}
+ protected <T extends Event> RemoteStream createOutputStream(String name) {
+ return createOutputStream(name, null);
+ }
+
+ protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
+ return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
+ }
+
+ protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
+ ProcessingElement... processingElements) {
+ remoteStreams.addInputStream(getId(), clusterName, streamName);
+ return createStream(streamName, finder, processingElements);
+
+ }
+
+ protected <T extends Event> Stream<T> createInputStream(String streamName, ProcessingElement... processingElements) {
+ remoteStreams.addInputStream(getId(), clusterName, streamName);
+ return createStream(streamName, processingElements);
+
+ }
+
/**
* Creates a {@link ProcessingElement} prototype.
*
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
deleted file mode 100644
index 17529d3..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.s4.core;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
- *
- */
-public class CustomModule extends AbstractModule {
-
- InputStream configFileInputStream;
- private PropertiesConfiguration config;
-
- public CustomModule(InputStream configFileInputStream) {
- this.configFileInputStream = configFileInputStream;
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (configFileInputStream != null) {
- try {
- configFileInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- bind(Assignment.class).to(AssignmentFromZK.class);
-
- bind(Topology.class).to(TopologyFromZK.class);
-
- bind(Emitter.class).to(TCPEmitter.class);
- bind(Listener.class).to(TCPListener.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
- }
-
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(configFileInputStream);
-
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
new file mode 100644
index 0000000..9003f26
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultModule.java
@@ -0,0 +1,99 @@
+package org.apache.s4.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.tcp.TCPRemoteEmitter;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ *
+ */
+public class DefaultModule extends AbstractModule {
+
+ InputStream configFileInputStream;
+ private PropertiesConfiguration config;
+
+ public DefaultModule(InputStream configFileInputStream) {
+ this.configFileInputStream = configFileInputStream;
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ if (configFileInputStream != null) {
+ try {
+ configFileInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ bind(Emitter.class).to(TCPEmitter.class);
+
+ bind(Listener.class).to(TCPListener.class);
+
+ bind(Assignment.class).to(AssignmentFromZK.class);
+
+ bind(Clusters.class).to(ClustersFromZK.class);
+ bind(Cluster.class).to(ClusterFromZK.class);
+
+ // RemoteEmitter instances are created through a factory, depending on the topology. We inject the factory
+ install(new FactoryModuleBuilder().implement(RemoteEmitter.class, TCPRemoteEmitter.class).build(
+ RemoteEmitterFactory.class));
+ bind(RemoteEmitters.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+ }
+
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(configFileInputStream);
+
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index c18603c..09a3778 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -48,7 +48,8 @@ public class Main {
private static void startCustomS4Node(String s4PropertiesFilePath) throws FileNotFoundException {
// TODO that's quite inconvenient anyway: we still need to specify the comm module in the config
// file passed as a parameter...
- Injector injector = Guice.createInjector(new CustomModule(new FileInputStream(new File(s4PropertiesFilePath))));
+ Injector injector = Guice
+ .createInjector(new DefaultModule(new FileInputStream(new File(s4PropertiesFilePath))));
startServer(logger, injector);
}
@@ -104,7 +105,7 @@ public class Main {
} else {
URL defaultS4Config = null;
try {
- defaultS4Config = Resources.getResource("/default.s4.properties");
+ defaultS4Config = Resources.getResource("default.s4.properties");
} catch (IllegalArgumentException e) {
logger.error(
"Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 34309d6..2571e20 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -20,10 +20,12 @@ import com.google.inject.Singleton;
* <p>
* A Listener implementation receives data from the network and passes an event as a byte array to the {@link Receiver}.
* The byte array is de-serialized and converted into an {@link Event}. Finally the event is passed to the matching
- * streams. There is a single {@link Receiver} instance per node.
+ * streams.
+ * </p>
+ * There is a single {@link Receiver} instance per node.
*
* Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer.
+ * from the application developer. </p>
*/
@Singleton
public class Receiver implements Runnable {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 48b980b..cdb0b01 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -1,18 +1,28 @@
package org.apache.s4.core;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.EventMessage;
import org.apache.s4.base.Hasher;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
+public class RemoteSender {
-@Singleton
-public class RemoteSender extends Sender {
+ final private Emitter emitter;
+ final private Hasher hasher;
- @Inject
- public RemoteSender(RemoteEmitter emitter, SerializerDeserializer serDeser, Hasher hasher) {
- super(emitter, serDeser, hasher);
+ public RemoteSender(Emitter emitter, Hasher hasher) {
+ super();
+ this.emitter = emitter;
+ this.hasher = hasher;
}
+ public void send(String hashKey, EventMessage eventMessage) {
+ if (hashKey == null) {
+ for (int i = 0; i < emitter.getPartitionCount(); i++) {
+ emitter.send(i, eventMessage);
+ }
+ } else {
+ int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+ emitter.send(partition, eventMessage);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
new file mode 100644
index 0000000..d44d270
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -0,0 +1,61 @@
+package org.apache.s4.core;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.Clusters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+public class RemoteSenders {
+
+ Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
+
+ @Inject
+ RemoteEmitters emitters;
+
+ @Inject
+ RemoteStreams streams;
+
+ @Inject
+ Clusters topologies;
+
+ @Inject
+ SerializerDeserializer serDeser;
+
+ @Inject
+ Hasher hasher;
+
+ Map<String, RemoteSender> sendersByTopology = new HashMap<String, RemoteSender>();
+
+ public void send(String hashKey, Event event) {
+
+ Set<StreamConsumer> consumers = streams.getConsumers(event.getStreamName());
+ for (StreamConsumer consumer : consumers) {
+ RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
+ if (sender == null) {
+ sender = new RemoteSender(emitters.getEmitter(topologies.getCluster(consumer.getClusterName())),
+ hasher);
+ // TODO cleanup when remote topologies die
+ sendersByTopology.put(consumer.getClusterName(), sender);
+ }
+ // we must set the app id of the consumer app for correct dispatch within the consumer node
+ // NOTE: this implies multiple serializations, there might be an optimization
+ event.setAppId(consumer.getAppId());
+ EventMessage eventMessage = new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(),
+ serDeser.serialize(event));
+ sender.send(hashKey, eventMessage);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
new file mode 100644
index 0000000..0833a07
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
@@ -0,0 +1,77 @@
+package org.apache.s4.core;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream that dispatches events to a remote cluster
+ *
+ */
+public class RemoteStream implements Streamable<Event> {
+
+ final private String name;
+ final protected Key<Event> key;
+ final static private String DEFAULT_SEPARATOR = "^";
+ // final private int id;
+
+ RemoteSenders remoteSenders;
+
+ Hasher hasher;
+
+ int id;
+ private App app;
+ private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
+
+ private static AtomicInteger remoteStreamCounter = new AtomicInteger();
+
+ public RemoteStream(App app, String name, KeyFinder<Event> finder, RemoteSenders remoteSenders, Hasher hasher,
+ RemoteStreams remoteStreams, String clusterName) {
+ this.app = app;
+ this.name = name;
+ this.remoteSenders = remoteSenders;
+ this.hasher = hasher;
+ if (finder == null) {
+ this.key = null;
+ } else {
+ this.key = new Key<Event>(finder, DEFAULT_SEPARATOR);
+ }
+ remoteStreams.addOutputStream(String.valueOf(app.getId()), clusterName, name);
+
+ }
+
+ @Override
+ public void put(Event event) {
+ event.setStreamId(getName());
+ event.setAppId(app.getId());
+
+ if (key != null) {
+ remoteSenders.send(key.get(event), event);
+ } else {
+ remoteSenders.send(null, event);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 39e5a98..1104716 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -7,7 +7,6 @@ import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
/**
* The {@link Sender} and its counterpart {@link Receiver} are the top level classes of the communication layer.
@@ -17,7 +16,6 @@ import com.google.inject.Singleton;
* Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
* from the application developer.
*/
-@Singleton
public class Sender {
final private Emitter emitter;
final private SerializerDeserializer serDeser;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index d148868..fdbd800 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -2,14 +2,12 @@ package org.apache.s4.core;
import java.io.File;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.jar.Attributes.Name;
import java.util.jar.JarFile;
import org.I0Itec.zkclient.ZkClient;
-import org.apache.s4.base.Event;
import org.apache.s4.base.util.S4RLoader;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.DeploymentManager;
@@ -19,7 +17,6 @@ import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
import com.google.common.collect.Maps;
-import com.google.common.io.PatternFilenameFilter;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
@@ -56,11 +53,11 @@ public class Server {
*
*/
@Inject
- public Server(@Named("comm.module") String commModuleName, @Named("s4.logger_level") String logLevel,
- @Named("appsDir") String appsDir, @Named("cluster.name") String clusterName,
- @Named("cluster.zk_address") String zookeeperAddress,
+ public Server(String commModuleName, @Named("s4.logger_level") String logLevel, @Named("appsDir") String appsDir,
+ @Named("cluster.name") String clusterName, @Named("cluster.zk_address") String zookeeperAddress,
@Named("cluster.zk_session_timeout") int sessionTimeout,
@Named("cluster.zk_connection_timeout") int connectionTimeout) {
+ // TODO do we need to separate the comm module?
this.commModuleName = commModuleName;
this.logLevel = logLevel;
this.appsDir = appsDir;
@@ -96,10 +93,13 @@ public class Server {
logger.error("Cannot create apps directory [{}]", appsDir);
}
}
- File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
- for (File s4rFile : s4rFiles) {
- loadApp(s4rFile);
- }
+
+ // disabled app loading from local files
+
+ // File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
+ // for (File s4rFile : s4rFiles) {
+ // loadApp(s4rFile);
+ // }
/* Now init + start apps. TODO: implement dynamic loading/unloading using ZK. */
for (Map.Entry<String, App> appEntry : apps.entrySet()) {
@@ -108,11 +108,6 @@ public class Server {
}
for (Map.Entry<String, App> appEntry : apps.entrySet()) {
- logger.info("Initializing app streams " + appEntry.getValue().getClass().getName());
- updateStreams(appEntry.getValue(), appEntry.getKey());
- }
-
- for (Map.Entry<String, App> appEntry : apps.entrySet()) {
logger.info("Starting app " + appEntry.getKey() + "/" + appEntry.getValue().getClass().getName());
appEntry.getValue().start();
}
@@ -138,46 +133,6 @@ public class Server {
return loadApp(s4r, s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
}
- public void updateStreams(App app, String appName) {
- // register streams
- List<Streamable<Event>> appStreams = app.getStreams();
- for (Streamable<Event> streamable : appStreams) {
- if (streams.containsKey(streamable.getName())) {
- logger.error("Application {} defines the stream {} but there is already a stream with that name",
- new String[] { appName, streamable.getName() });
- } else {
- // zkClient.createEphemeral("/" + clusterName + "/streams/producers/" + appName);
- logger.debug("Adding stream {} for app {}");
- streams.put(streamable.getName(), streamable);
- if (eventSources.containsKey(streamable.getName())) {
- logger.debug("Connecting matching event source for stream {} for app {}", streamable.getName(),
- appName);
- eventSources.get(streamable.getName()).subscribeStream(streams.get(streamable.getName()));
- }
- }
- }
-
- List<EventSource> appEventSources = app.getEventSources();
- for (EventSource eventSource : appEventSources) {
- if (eventSources.containsKey(eventSource.getName())) {
- logger.error(
- "Application {} defines the event source {} but there is already an event source with that name, from app {}",
- new String[] { appName, eventSource.getName(),
- String.valueOf(streams.get(eventSource.getName())) });
- } else {
- // zkClient.createEphemeral("/" + clusterName + "/streams/consumers/" + appName);
- logger.debug("adding event source {} from app {}", eventSource.getName(), appName);
- eventSources.put(eventSource.getName(), eventSource);
- }
- if (streams.containsKey(eventSource.getName())) {
- logger.debug("Connecting matching stream from app {} to event source {}", appName,
- eventSource.getName());
- eventSource.subscribeStream(streams.get(eventSource.getName()));
- }
- }
-
- }
-
public App loadApp(File s4r, String appName) {
// TODO handle application upgrade
@@ -222,8 +177,6 @@ public class Server {
app.init();
- updateStreams(app, appName);
-
app.start();
}
}