You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/11 16:31:30 UTC
[1/2] git commit: S4-91 RemoteSenders to exclusive PEs, improve tests
S4-91 RemoteSenders to exclusive PEs, improve tests
Applied patch by Aimee Cheng
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/6aaa5536
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/6aaa5536
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/6aaa5536
Branch: refs/heads/S4-91
Commit: 6aaa5536dffdc9e8c93f12465d165881b96983d3
Parents: 16d50da
Author: Daniel Gómez Ferro <df...@apache.org>
Authored: Mon Mar 11 15:57:35 2013 +0100
Committer: Daniel Gómez Ferro <df...@apache.org>
Committed: Mon Mar 11 15:57:35 2013 +0100
----------------------------------------------------------------------
.../apache/s4/comm/topology/AssignmentFromZK.java | 1 -
.../org/apache/s4/comm/topology/PartitionData.java | 103 +++++++++++++
.../apache/s4/comm/topology/StreamConsumer.java | 19 +++
.../java/org/apache/s4/comm/topology/ZNRecord.java | 4 +
.../apache/s4/comm/topology/ZkRemoteStreams.java | 35 +++++
.../src/main/java/org/apache/s4/core/App.java | 75 ++++++++-
.../org/apache/s4/core/DefaultRemoteSenders.java | 1 +
.../java/org/apache/s4/core/ProcessingElement.java | 39 +++--
.../main/java/org/apache/s4/core/RemoteSender.java | 34 ++++-
.../main/java/org/apache/s4/core/S4Bootstrap.java | 2 +
.../src/main/java/org/apache/s4/core/Stream.java | 4 +
.../java/org/apache/s4/deploy/DeploymentUtils.java | 2 +-
.../org/apache/s4/core/ri/RemoteAdapterApp.java | 50 ++++++
.../org/apache/s4/core/ri/RemoteStreamRITest.java | 119 +++++++++++++++
.../apache/s4/core/ri/RuntimeIsolationTest.java | 106 ++++++++-----
.../s4/wordcount/IsolationWordCounterPE.java | 91 +++++++++++
16 files changed, 612 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 78d4f69..33eb920 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -32,7 +32,6 @@ import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
new file mode 100644
index 0000000..691cb34
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PartitionData.java
@@ -0,0 +1,103 @@
+package org.apache.s4.comm.topology;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+public class PartitionData {
+ int numPartitions;
+ boolean isExclusive = false;
+ Map<Integer, Integer> globalPartitionMap = Maps.newHashMap();
+ List<String> streams = new ArrayList<String>();
+
+ public PartitionData() {
+
+ }
+
+ public PartitionData(boolean isExclusive, int numPartitions) {
+ this.numPartitions = numPartitions;
+ this.isExclusive = isExclusive;
+ }
+
+ public boolean isExclusive() {
+ return this.isExclusive;
+ }
+
+ public int getPartitionCount() {
+ return this.numPartitions;
+ }
+
+ public void addPartitionMappingInfo(int partitionId, int nodeId) {
+ globalPartitionMap.put(partitionId, nodeId);
+ }
+
+ public List<String> getStreams() {
+ return streams;
+ }
+
+ public void addStream(String stream) {
+ if (!streams.contains(stream)) {
+ streams.add(stream);
+ System.out.println("Add " + stream);
+ }
+ }
+
+ public void setStreams(List<String> streams) {
+ this.streams = streams;
+ }
+
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setPartitionCount(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ public void setExclusive(boolean isExclusive) {
+ this.isExclusive = isExclusive;
+ }
+
+ public int getGlobalePartitionId(int partitionId) {
+ return globalPartitionMap.get(partitionId);
+ }
+
+ public String toString() {
+ return "PartitionCount: " + numPartitions + ", isExclusive: " + isExclusive + ", partitionMap: "
+ + globalPartitionMap;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((globalPartitionMap == null) ? 0 : globalPartitionMap.hashCode());
+ result = prime * result + (isExclusive ? 1231 : 1237);
+ result = prime * result + numPartitions;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PartitionData other = (PartitionData) obj;
+ if (globalPartitionMap == null) {
+ if (other.globalPartitionMap != null)
+ return false;
+ } else if (!globalPartitionMap.equals(other.globalPartitionMap))
+ return false;
+ if (isExclusive != other.isExclusive)
+ return false;
+ if (numPartitions != other.numPartitions)
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
index 52fa2a8..a4cdc81 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -18,6 +18,8 @@
package org.apache.s4.comm.topology;
+import java.util.Map;
+
/**
* A subscriber to a published stream. Identified through its cluster name (for dispatching to the remote cluster) and
* application ID (for dispatching within a node (NOTE: this parameter is ignored)).
@@ -28,6 +30,11 @@ public class StreamConsumer {
int appId;
String clusterName;
+ /**
+ * The keys are PE prototype ids.
+ */
+ Map<String, PartitionData> pePartitionInfo;
+
public StreamConsumer(int appId, String clusterName) {
super();
this.appId = appId;
@@ -42,6 +49,14 @@ public class StreamConsumer {
return clusterName;
}
+ public Map<String, PartitionData> getPePartitionInfo() {
+ return pePartitionInfo;
+ }
+
+ public void setPePartitionInfo(Map<String, PartitionData> pePartitionInfo) {
+ this.pePartitionInfo = pePartitionInfo;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -70,4 +85,8 @@ public class StreamConsumer {
return true;
}
+ public String toString() {
+ return "appId: " + appId + ", clusterName: " + clusterName + ", pePartitionInfo: " + pePartitionInfo;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
index aee7f6f..3c9cc7b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecord.java
@@ -103,4 +103,8 @@ public class ZNRecord {
}
return false;
}
+
+ public String toString() {
+ return "id: " + id + ", simpleFields:" + simpleFields.toString() + ", mapFields" + mapFields;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
index 0855fc0..a92e9cc 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
@@ -166,12 +167,46 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
if (producerData != null) {
StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
producerData.getSimpleField("clusterName"));
+ consumer.setPePartitionInfo(readPePartitionData(producerData.getSimpleField("clusterName"), streamName));
consumers.add(consumer);
}
}
streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
}
+ /**
+ * Read PE partition data from Zookeeper. Remote Stream need these to do the partition.
+ *
+ * @param clusterName
+ * @param streamName
+ * @return
+ */
+ private Map<String, PartitionData> readPePartitionData(String clusterName, String streamName) {
+ String path = "/s4/clusters/" + clusterName + "/app/s4App";
+ List<String> pePrototypes = zkClient.getChildren(path);
+ Map<String, PartitionData> results = Maps.newHashMap();
+ for (String prototypeSeq : pePrototypes) {
+ ZNRecord data = zkClient.readData(path + "/" + prototypeSeq, true);
+ if (data != null && data.getListField("streams").contains(streamName)) {
+ String prototypeId = data.getSimpleField("prototypeId");
+ if (!results.keySet().contains(prototypeSeq)) {
+ results.put(
+ prototypeId,
+ new PartitionData(Boolean.parseBoolean(data.getSimpleField("isExclusive")), Integer
+ .parseInt(data.getSimpleField("partitionCount"))));
+ Map<String, String> map = data.getMapField("globalPartitionMap");
+ for (String key : map.keySet()) {
+ results.get(prototypeId).addPartitionMappingInfo(Integer.parseInt(key),
+ Integer.parseInt(map.get(key)));
+ }
+ results.get(prototypeId).setStreams(data.getListField("streams"));
+ }
+ }
+ }
+
+ return results;
+ }
+
/*
* (non-Javadoc)
*
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index c7e02ea..c724ff6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -31,6 +31,8 @@ import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.staging.StreamExecutorServiceFactory;
import org.apache.s4.core.util.S4Metrics;
@@ -77,9 +79,15 @@ public abstract class App {
@Inject
private Hasher hasher;
+ private ZkClient zkClient;
+
@Inject
private RemoteStreams remoteStreams;
+ public void setZkClient(ZkClient zkClient) {
+ this.zkClient = zkClient;
+ }
+
@Inject
private Cluster topology;
@@ -141,7 +149,6 @@ public abstract class App {
}
public ProcessingElement getPE(String name) {
-
return peByName.get(name);
}
@@ -194,7 +201,6 @@ public abstract class App {
onInit();
- schedule();
}
/**
@@ -235,6 +241,31 @@ public abstract class App {
public void schedule() {
schedule(topology);
+ writeToZK();
+ }
+
+ /**
+ *
+ * To non-exclusive PEs, only need to write one node; To exclusive PEs,
+ *
+ * @param pes
+ */
+ private void writeToZK() {
+ List<String> streamsOfNEPE = new ArrayList<String>();
+ ProcessingElement NEPeInstance = null;
+ for (int i = 0; i < getPePrototypes().size(); i++) {
+ ProcessingElement pe = getPePrototypes().get(i);
+ if (pe.isExclusive()) {
+ createZKNodeForPartition("prototype-" + i, pe, null);
+ } else {
+ streamsOfNEPE.addAll(pe.getInputStreams());
+ NEPeInstance = pe;
+ }
+ }
+
+ if (streamsOfNEPE.size() != 0) {
+ createZKNodeForPartition("Non-exclusivePEs", NEPeInstance, streamsOfNEPE);
+ }
}
/**
@@ -272,26 +303,54 @@ public abstract class App {
// assign partition to exclusive PE
for (ProcessingElement pe : exclusivePEList) {
for (int i = 0; i < pe.getPartitionCount(); i++) {
- pe.setGlobalPartitionId(i, partition++);
+ pe.addGlobalPartitionId(i, partition++);
}
}
logger.info("Finished schedule !");
showPartitionInfo();
+
};
+ /**
+ * Create Zk node for saving partition information of PEs
+ *
+ * @param id
+ * @param pe
+ * @param stream
+ */
+ private void createZKNodeForPartition(String id, ProcessingElement pe, List<String> streams) {
+ String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ ZNRecord record = new ZNRecord(id);
+ record.putSimpleField("prototypeId", id);
+ record.putSimpleField("isExclusive", String.valueOf(pe.isExclusive()));
+ record.putSimpleField("partitionCount", String.valueOf(pe.getPartitionCount()));
+ if (streams == null) {
+ record.putListField("streams", pe.getInputStreams());
+ } else {
+ record.putListField("streams", streams);
+ }
+ Map<String, String> map = Maps.newHashMap();
+ for (int j = 0; j < pe.getPartitionCount(); j++) {
+ map.put(String.valueOf(j), String.valueOf(pe.getGlobalPartitionId(j)));
+ }
+ record.putMapField("globalPartitionMap", map);
+ zkClient.createEphemeralSequential(appPath + "/prototype_", record);
+ logger.debug("write partition info to zk: " + record);
+ }
+
private void showPartitionInfo() {
StringBuilder sb = new StringBuilder("\n");
- String line = "---------------------------------------------------------------------------\n";
+ String line = "------------------------------------------------------------------------------------------\n";
sb.append(line);
sb.append("Partition Information\n");
sb.append(line);
- sb.append(String.format("%-20s%-20s%-15s%-10s%-20s%n", "PE Class", "PE Name", "Exclusive", "Count",
- "Partition"));
+ sb.append(String
+ .format("%-25s%-25s%-15s%-10s%-25s%n", "PE Class", "PE Name", "Exclusive", "Count", "Partition"));
for (ProcessingElement pe : getPePrototypes()) {
- sb.append(String.format("%-20s%-20s%-15s%-10d", pe.getClass().getSimpleName(), pe.getName(),
+ sb.append(String.format("%-25s%-25s%-15s%-10d", pe.getClass().getSimpleName(), pe.getName(),
pe.isExclusive(), pe.getPartitionCount()));
StringBuilder partition = new StringBuilder();
if (pe.isExclusive()) {
@@ -300,7 +359,7 @@ public abstract class App {
} else {
partition.append("[0 ~ " + (pe.getPartitionCount() - 1) + "]");
}
- sb.append(String.format("%-20s%n", partition.toString()));
+ sb.append(String.format("%-25s%n", partition.toString()));
}
sb.append(line);
logger.debug(sb.toString());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index 6aaa8f1..e1cd90e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -85,6 +85,7 @@ public class DefaultRemoteSenders implements RemoteSenders {
if (sender == null) {
RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
.getClusterName())), hasher, consumer.getClusterName());
+ newSender.setPartitionDatas(consumer.getPePartitionInfo());
// TODO cleanup when remote topologies die
sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
if (sender == null) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index e08f468..09ddd0d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
import org.apache.s4.base.Event;
+import org.apache.s4.comm.topology.PartitionData;
import org.apache.s4.core.ft.CheckpointId;
import org.apache.s4.core.ft.CheckpointingConfig;
import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
@@ -147,10 +149,7 @@ public abstract class ProcessingElement implements Cloneable {
transient private Timer processingTimer;
- transient private boolean isExclusive = false;
- transient private int partitionCount = -1;
- /* This map holds the mapping of PE partition id and glocal partition id */
- transient private Map<Integer, Integer> globalPartitionMap;
+ transient private PartitionData partitionData;
transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
.build();
@@ -171,7 +170,8 @@ public abstract class ProcessingElement implements Cloneable {
});
triggers = new MapMaker().makeMap();
- globalPartitionMap = new MapMaker().makeMap();
+ partitionData = new PartitionData();
+
/*
* Only the PE Prototype uses the constructor. The PEPrototype field will be cloned by the instances and point
* to the prototype.
@@ -180,12 +180,20 @@ public abstract class ProcessingElement implements Cloneable {
}
- public void setGlobalPartitionId(int partitionId, int nodeId) {
- globalPartitionMap.put(partitionId, nodeId);
+ public void addGlobalPartitionId(int partitionId, int nodeId) {
+ partitionData.addPartitionMappingInfo(partitionId, nodeId);
}
public int getGlobalPartitionId(int partitionId) {
- return globalPartitionMap.get(partitionId);
+ return partitionData.getGlobalePartitionId(partitionId);
+ }
+
+ public void addInputStream(String stream) {
+ partitionData.addStream(stream);
+ }
+
+ public List<String> getInputStreams() {
+ return partitionData.getStreams();
}
/**
@@ -503,7 +511,7 @@ public abstract class ProcessingElement implements Cloneable {
}
public boolean isExclusive() {
- return isExclusive;
+ return partitionData.isExclusive();
}
/**
@@ -513,16 +521,20 @@ public abstract class ProcessingElement implements Cloneable {
* @param partitionCount
*/
public void setExclusive(int partitionCount) {
- this.isExclusive = true;
- this.partitionCount = partitionCount;
+ this.partitionData.setExclusive(true);
+ this.partitionData.setPartitionCount(partitionCount);
}
public void setPartitionCount(int partitionCount) {
- this.partitionCount = partitionCount;
+
+ this.partitionData.setPartitionCount(partitionCount);
+ for (int i = 0; i < partitionCount; i++) {
+ this.partitionData.addPartitionMappingInfo(i, i);
+ }
}
public int getPartitionCount() {
- return partitionCount;
+ return partitionData.getPartitionCount();
}
private boolean isTrigger(Event event) {
@@ -946,4 +958,5 @@ public abstract class ProcessingElement implements Cloneable {
return sb.toString();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index dba8b6c..6cf98e8 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -19,10 +19,17 @@
package org.apache.s4.core;
import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Hasher;
+import org.apache.s4.comm.topology.PartitionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Sends events to a remote cluster.
@@ -30,27 +37,40 @@ import org.apache.s4.base.Hasher;
*/
public class RemoteSender {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteSender.class);
+
final private Emitter emitter;
final private Hasher hasher;
AtomicInteger targetPartition = new AtomicInteger();
final private String remoteClusterName;
+ private Map<String, PartitionData> partitionDatas;
public RemoteSender(Emitter emitter, Hasher hasher, String clusterName) {
super();
this.emitter = emitter;
this.hasher = hasher;
this.remoteClusterName = clusterName;
+ }
+ public void setPartitionDatas(Map<String, PartitionData> partitionDatas) {
+ this.partitionDatas = partitionDatas;
}
public void send(String hashKey, ByteBuffer message) throws InterruptedException {
- int partition;
- if (hashKey == null) {
- // round robin by default
- partition = Math.abs(targetPartition.incrementAndGet() % emitter.getPartitionCount());
- } else {
- partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+
+ Set<Integer> partitions = new HashSet<Integer>();
+
+ logger.warn("Remote sending with hash: " + hashKey);
+ int hashValue = (hashKey == null) ? targetPartition.incrementAndGet() : (int) hasher.hash(hashKey);
+
+ for (String prototype : partitionDatas.keySet()) {
+ PartitionData data = partitionDatas.get(prototype);
+ partitions.add(data.getGlobalePartitionId(hashValue % data.getPartitionCount()));
+ }
+
+ for (Integer partition : partitions) {
+ logger.warn("Remote sending to partition: " + partition);
+ emitter.send(partition, message);
}
- emitter.send(partition, message);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
index 757ce4f..64bc031 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -134,7 +134,9 @@ public class S4Bootstrap {
// use correct classLoader for running the app initialization
Thread.currentThread().setContextClassLoader(app.getClass().getClassLoader());
+ app.setZkClient(zkClient);
app.init();
+ app.schedule();
app.start();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 2cf55f7..c2f0805 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -154,6 +154,9 @@ public class Stream<T extends Event> implements Streamable {
*/
public Stream<T> setPEs(ProcessingElement... pes) {
this.targetPEs = pes;
+ for(ProcessingElement pe: pes){
+ pe.addInputStream(name);
+ }
return this;
}
@@ -263,6 +266,7 @@ public class Stream<T extends Event> implements Streamable {
return targetPEs;
}
+
/**
* Stop and close this stream.
*/
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
index cc404af..0f91b0c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
@@ -20,7 +20,7 @@ public class DeploymentUtils {
if (zk.exists("/s4/clusters/" + clusterName + "/app/s4App")) {
if (deleteIfExists) {
- zk.delete("/s4/clusters/" + clusterName + "/app/s4App");
+ zk.deleteRecursive("/s4/clusters/" + clusterName + "/app/s4App");
}
}
try {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
new file mode 100644
index 0000000..91f0a85
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteAdapterApp.java
@@ -0,0 +1,50 @@
+package org.apache.s4.core.ri;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteStream;
+import org.apache.s4.wordcount.SentenceKeyFinder;
+import org.apache.s4.wordcount.WordCountTest;
+
+public class RemoteAdapterApp extends App {
+
+ String outputStreamName;
+ private RemoteStream remoteStream;
+
+ @Override
+ protected void onInit() {
+ remoteStream = createOutputStream("inputStream", new SentenceKeyFinder());
+ }
+
+ protected KeyFinder<Event> remoteStreamKeyFinder;
+
+ protected void setKeyFinder(KeyFinder<Event> keyFinder) {
+ this.remoteStreamKeyFinder = keyFinder;
+ }
+
+ @Override
+ protected void onStart() {
+ injectSentence(WordCountTest.SENTENCE_1);
+ injectSentence(WordCountTest.SENTENCE_2);
+ injectSentence(WordCountTest.SENTENCE_3);
+
+ }
+
+ public void injectSentence(String sentence) {
+ Event event = new Event();
+ event.setStreamId("inputStream");
+ event.put("sentence", String.class, sentence);
+ getRemoteStream().put(event);
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public RemoteStream getRemoteStream() {
+ return remoteStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
new file mode 100644
index 0000000..0757f5e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RemoteStreamRITest.java
@@ -0,0 +1,119 @@
+package org.apache.s4.core.ri;
+
+import static org.apache.s4.core.ri.RuntimeIsolationTest.counterNumber;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.wordcount.WordCountModule;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+public class RemoteStreamRITest extends RuntimeIsolationTest {
+
+ private static Logger logger = LoggerFactory.getLogger(RemoteStreamRITest.class);
+
+ @Override
+ public void injectData() throws InterruptedException, IOException {
+ // Use remote stream
+
+ }
+
+ @Override
+ public void startNodes() throws IOException, InterruptedException {
+ final ZooKeeper zk = CommTestUtils.createZkClient();
+
+ TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ taskSetup.setup("cluster2", 1, 1500);
+
+ s4nodes = new Process[numberTasks + 1];
+
+ List<Process> nodes = new ArrayList<Process>();
+
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(IsolationWordCountApp.class.getName())
+ .customModulesNames(ImmutableList.of(WordCountModule.class.getName())).build(), "cluster1", false,
+ "localhost:2181");
+ nodes.addAll(Arrays.asList(CoreTestUtils.forkS4Nodes(new String[] { "-c", "cluster1" }, new ZkClient(
+ "localhost:2181"), 10, "cluster1", numberTasks)));
+
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(RemoteAdapterApp.class.getName())
+ .customModulesNames(ImmutableList.of(WordCountModule.class.getName())).build(), "cluster2", false,
+ "localhost:2181");
+
+ nodes.addAll(Arrays.asList(CoreTestUtils.forkS4Nodes(new String[] { "-c", "cluster2" }, new ZkClient(
+ "localhost:2181"), 10, "cluster2", 1)));
+
+ s4nodes = nodes.toArray(new Process[] {});
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ try {
+ CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
+ // add authorizations for processing
+ for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+// injectData();
+
+ Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
+ String results = new String(zk.getData("/results", false, null));
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
+
+ List<String> counterInstances = zk.getChildren("/counters", false);
+
+ int totalCount = 0;
+ int activeInstances = 0;
+ for (String instance : counterInstances) {
+ int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
+ if (count != 0) {
+ activeInstances++;
+ }
+ totalCount += count;
+ }
+ Assert.assertEquals(numberTasks, counterInstances.size());
+ Assert.assertEquals(counterNumber, activeInstances);
+
+ Assert.assertEquals(13, totalCount);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ @Test
+ public void testSimple() {
+ ZooKeeper zk;
+ try {
+ zk = CommTestUtils.createZkClient();
+ zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ startNodes();
+
+ } catch (IOException e) {
+ logger.error("", e);
+ } catch (KeeperException e) {
+ logger.error("", e);
+ } catch (InterruptedException e) {
+ logger.error("", e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
index 6a2549d..0b9b064 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ri/RuntimeIsolationTest.java
@@ -19,29 +19,25 @@
package org.apache.s4.core.ri;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
-
import org.apache.s4.base.Event;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.core.App;
-import org.apache.s4.core.S4Node;
import org.apache.s4.core.Stream;
-import org.apache.s4.core.ft.FTWordCountApp;
-import org.apache.s4.core.ft.FileSystemBackendCheckpointingModule;
import org.apache.s4.core.util.AppConfig;
import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.wordcount.IsolationWordCounterPE;
import org.apache.s4.wordcount.SentenceKeyFinder;
import org.apache.s4.wordcount.WordClassifierPE;
import org.apache.s4.wordcount.WordCountEvent;
import org.apache.s4.wordcount.WordCountKeyFinder;
import org.apache.s4.wordcount.WordCountModule;
import org.apache.s4.wordcount.WordCountTest;
-import org.apache.s4.wordcount.WordCounterPE;
import org.apache.s4.wordcount.WordSeenEvent;
import org.apache.s4.wordcount.WordSeenKeyFinder;
import org.apache.s4.wordcount.WordSplitterPE;
@@ -49,21 +45,26 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
public class RuntimeIsolationTest extends WordCountTest {
+ private static Logger logger = LoggerFactory.getLogger(RuntimeIsolationTest.class);
final static int numberTasks = 5;
- private Process[] s4nodes;
+ protected Process[] s4nodes;
+
+ protected static int counterNumber = 2;
public RuntimeIsolationTest() {
super(numberTasks);
}
-
+
@After
public void cleanup() throws IOException, InterruptedException {
if (s4nodes == null) {
@@ -75,46 +76,66 @@ public class RuntimeIsolationTest extends WordCountTest {
}
}
- /**
- * reuse {@link WordCountTest}. Start 3 nodes.
- *
- */
@Test
- @Override
- public void testSimple() throws Exception {
- final ZooKeeper zk = CommTestUtils.createZkClient();
+ public void testSimple() {
+ try {
+ final ZooKeeper zk = CommTestUtils.createZkClient();
+
+ zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // start nodes
+ startNodes();
+
+ // we create the emitter now, it will share zk node assignment with the S4 node
+ createEmitter();
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
- // start 3 nodes
- startNodes("cluster1", IsolationWordCountApp.class.getName(), numberTasks);
+ // add authorizations for processing
+ for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
- // we create the emitter now, it will share zk node assignment with the S4 node
- createEmitter();
+ injectData();
- CountDownLatch signalTextProcessed = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
+ Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
+ String results = new String(zk.getData("/results", false, null));
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
- // add authorizations for processing
- for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
- zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ List<String> counterInstances = zk.getChildren("/counters", false);
+
+ int totalCount = 0;
+ int activeInstances = 0;
+ for (String instance : counterInstances) {
+ int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
+ if (count != 0) {
+ activeInstances++;
+ }
+ totalCount += count;
+ }
+ Assert.assertEquals(numberTasks, counterInstances.size());
+ Assert.assertEquals(counterNumber, activeInstances);
+
+ Assert.assertEquals(13, totalCount);
+ } catch (Exception e) {
+ logger.error("ERROR!", e);
}
+ }
+
+ public void injectData() throws IOException, InterruptedException {
injectSentence(SENTENCE_1);
injectSentence(SENTENCE_2);
injectSentence(SENTENCE_3);
- Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
- String results = new String(zk.getData("/results", false, null));
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
-
}
- public void startNodes(String clusterName, String appClass, int number) throws InterruptedException, IOException {
- s4nodes = new Process[number];
- DeploymentUtils.initAppConfig(
- new AppConfig.Builder()
- .appClassName(appClass)
- .customModulesNames(ImmutableList.of(WordCountModule.class.getName()))
- .build(), "cluster1", false, "localhost:2181");
+ public void startNodes() throws InterruptedException, IOException {
+ s4nodes = new Process[numberTasks];
+ DeploymentUtils.initAppConfig(new AppConfig.Builder().appClassName(IsolationWordCountApp.class.getName())
+ .customModulesNames(ImmutableList.of(WordCountModule.class.getName())).build(), "cluster1", false,
+ "localhost:2181");
s4nodes = CoreTestUtils.forkS4Nodes(new String[] { "-c", "cluster1" }, new ZkClient("localhost:2181"), 10,
- "cluster1", number);
+ "cluster1", numberTasks);
}
static class IsolationWordCountApp extends App {
@@ -135,12 +156,12 @@ public class RuntimeIsolationTest extends WordCountTest {
WordClassifierPE wordClassifierPrototype = createPE(WordClassifierPE.class, "classifierPE");
// set WordClassifierPE has 2 exclusive partitions
- wordClassifierPrototype.setExclusive(2);
+// wordClassifierPrototype.setExclusive(2);
Stream<WordCountEvent> wordCountStream = createStream("words counts stream", new WordCountKeyFinder(),
wordClassifierPrototype);
- WordCounterPE wordCounterPrototype = createPE(WordCounterPE.class, "counterPE");
- wordCounterPrototype.setExclusive(2);
+ IsolationWordCounterPE wordCounterPrototype = createPE(IsolationWordCounterPE.class, "counterPE");
+ wordCounterPrototype.setExclusive(counterNumber);
wordCounterPrototype.setWordClassifierStream(wordCountStream);
Stream<WordSeenEvent> wordSeenStream = createStream("words seen stream", new WordSeenKeyFinder(),
@@ -149,9 +170,7 @@ public class RuntimeIsolationTest extends WordCountTest {
wordSplitterPrototype.setWordSeenStream(wordSeenStream);
Stream<Event> sentenceStream = createInputStream("inputStream", new SentenceKeyFinder(),
wordSplitterPrototype);
-
- //TestPE
- WordSplitterPE wordSplitterTestPE = createPE(WordSplitterPE.class,"TestPE");
+
}
@Override
@@ -160,4 +179,5 @@ public class RuntimeIsolationTest extends WordCountTest {
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6aaa5536/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
new file mode 100644
index 0000000..fc8a891
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/IsolationWordCounterPE.java
@@ -0,0 +1,91 @@
+package org.apache.s4.wordcount;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IsolationWordCounterPE extends ProcessingElement implements Watcher {
+ private static Logger logger = LoggerFactory.getLogger(IsolationWordCounterPE.class);
+
+ transient private ZooKeeper zk;
+ private String zkPath;
+ static int count = 0;
+ int wordCounter;
+ transient Stream<WordCountEvent> wordClassifierStream;
+ public static AtomicInteger prototypeId = new AtomicInteger();
+
+ private IsolationWordCounterPE() {
+
+ }
+
+ public IsolationWordCounterPE(App app) {
+ super(app);
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:2181", 4000, this);
+ synchronized (prototypeId) {
+ zkPath = "/counters/counter_prototype_" + prototypeId.incrementAndGet() + "_"
+ + System.currentTimeMillis();
+ zk.create(zkPath, "0".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void onEvent(WordSeenEvent event) {
+
+ wordCounter++;
+ System.out.println("seen word " + event.getWord());
+
+ // NOTE: it seems the id is the key for now...
+ wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+ // Update the zookeeper
+ synchronized (this) {
+ count++;
+ try {
+ zk.setData(zkPath, String.valueOf(count).getBytes(), -1);
+ logger.info("set " + zkPath + " " + count);
+ } catch (KeeperException e) {
+ logger.error(zkPath + " " + count, e);
+ } catch (InterruptedException e) {
+ logger.error(zkPath + " " + count, e);
+ }
+ }
+ }
+
+ public void setWordClassifierStream(Stream<WordCountEvent> stream) {
+ this.wordClassifierStream = stream;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}