You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[1/22] git commit: Merge branch 'S4-22' into piper
Updated Branches:
refs/heads/piper f296947a8 -> 121f33066
Merge branch 'S4-22' into piper
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/121f3306
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/121f3306
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/121f3306
Branch: refs/heads/piper
Commit: 121f33066dc4bf02bd37c618312c97874c6c73c4
Parents: f296947 0338484
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Jun 15 17:52:49 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Jun 15 17:52:49 2012 +0200
----------------------------------------------------------------------
build.gradle | 63 ++---
gradle/wrapper/gradle-wrapper.properties | 2 +-
gradlew | 7 +-
gradlew.bat | 2 +-
lib/gradle-base-services-1.0-rc-3.jar | Bin 0 -> 31974 bytes
lib/gradle-core-1.0-rc-3.jar | Bin 0 -> 1643485 bytes
lib/gradle-tooling-api-1.0-rc-3.jar | Bin 0 -> 122911 bytes
lib/gradle-wrapper-1.0-rc-3.jar | Bin 0 -> 39860 bytes
lib/gradle-wrapper-1.0-rc-3.properties | 6 +
s4 | 24 ++
settings.gradle | 5 +
.../src/main/java/org/apache/s4/base/Emitter.java | 2 +-
.../src/main/java/org/apache/s4/base/Event.java | 22 +-
.../main/java/org/apache/s4/base/EventMessage.java | 52 +++
.../java/org/apache/s4/base/RemoteEmitter.java | 5 +
.../org/apache/s4/base/util/MultiClassLoader.java | 31 +-
subprojects/s4-comm/s4-comm.gradle | 4 +-
.../java/org/apache/s4/comm/DefaultCommModule.java | 129 ++++++++
.../src/main/java/org/apache/s4/comm/Module.java | 111 -------
.../java/org/apache/s4/comm/QueueingEmitter.java | 12 +-
.../org/apache/s4/comm/RemoteEmitterFactory.java | 10 +
.../apache/s4/comm/loopback/LoopBackEmitter.java | 12 +-
.../org/apache/s4/comm/serialize/KryoSerDeser.java | 49 ++--
.../org/apache/s4/comm/tcp/RemoteEmitters.java | 34 ++
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 48 +++-
.../java/org/apache/s4/comm/tcp/TCPListener.java | 12 +
.../org/apache/s4/comm/tcp/TCPRemoteEmitter.java | 27 ++
.../java/org/apache/s4/comm/tools/TaskSetup.java | 54 ++--
.../org/apache/s4/comm/topology/Assignment.java | 8 +-
.../s4/comm/topology/AssignmentFromFile.java | 127 --------
.../apache/s4/comm/topology/AssignmentFromZK.java | 45 ++-
.../java/org/apache/s4/comm/topology/Cluster.java | 139 +--------
.../s4/comm/topology/ClusterChangeListener.java | 10 +
.../org/apache/s4/comm/topology/ClusterFromZK.java | 198 ++++++++++++
.../java/org/apache/s4/comm/topology/Clusters.java | 12 +
.../apache/s4/comm/topology/ClustersFromZK.java | 103 ++++++
.../apache/s4/comm/topology/PhysicalCluster.java | 125 ++++++++
.../org/apache/s4/comm/topology/RemoteCluster.java | 10 +
.../org/apache/s4/comm/topology/RemoteStreams.java | 219 +++++++++++++
.../apache/s4/comm/topology/StreamConsumer.java | 55 ++++
.../java/org/apache/s4/comm/topology/Topology.java | 9 -
.../s4/comm/topology/TopologyChangeListener.java | 5 -
.../apache/s4/comm/topology/TopologyFromFile.java | 33 --
.../apache/s4/comm/topology/TopologyFromZK.java | 144 ---------
.../java/org/apache/s4/comm/topology/ZNRecord.java | 17 +-
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 28 +-
.../org/apache/s4/comm/udp/UDPRemoteEmitter.java | 18 +
.../src/main/resources/default.s4.comm.properties | 12 +
.../s4-comm/src/main/resources/s4-comm.properties | 9 -
.../java/org/apache/s4/comm/DeliveryTestUtil.java | 141 +++++++++
.../s4/comm/tcp/MultiPartitionDeliveryTest.java | 10 +-
.../org/apache/s4/comm/tcp/NetworkGlitchTest.java | 11 +-
.../org/apache/s4/comm/tcp/SimpleDeliveryTest.java | 10 +-
.../java/org/apache/s4/comm/tcp/TCPBasedTest.java | 48 ---
.../java/org/apache/s4/comm/tcp/TCPCommTest.java | 64 ++++
.../s4/comm/topology/AssignmentFromZKTest.java | 42 ---
.../s4/comm/topology/AssignmentsFromZKTest.java | 69 ++++
.../s4/comm/topology/ClustersFromZKTest.java | 97 ++++++
.../s4/comm/topology/TopologyFromZKTest.java | 78 -----
.../org/apache/s4/comm/topology/ZKBaseTest.java | 37 +--
.../s4/comm/udp/MultiPartitionDeliveryTest.java | 6 +-
.../org/apache/s4/comm/udp/SimpleDeliveryTest.java | 6 +-
.../java/org/apache/s4/comm/udp/UDPBasedTest.java | 50 ---
.../java/org/apache/s4/comm/udp/UDPCommTest.java | 65 ++++
.../org/apache/s4/comm/util/PartitionInfo.java | 11 +-
.../org/apache/s4/comm/util/ProtocolTestUtil.java | 7 +-
.../java/org/apache/s4/fixtures/CommTestUtils.java | 6 +-
.../FileBasedClusterManagementTestModule.java | 81 -----
.../ZkBasedClusterManagementTestModule.java | 92 ------
.../java/org/apache/s4/fixtures/ZkBasedTest.java | 39 +--
.../src/test/resources/default.s4.properties | 13 -
.../src/test/resources/s4-comm-test.properties | 10 -
.../src/test/resources/udp.s4.comm.properties | 12 +
subprojects/s4-core/s4-core.gradle | 25 +-
.../src/main/java/org/apache/s4/core/App.java | 240 ++++++++------
.../main/java/org/apache/s4/core/CustomModule.java | 103 ------
.../java/org/apache/s4/core/DefaultCoreModule.java | 76 +++++
.../main/java/org/apache/s4/core/EventSource.java | 58 ++--
.../src/main/java/org/apache/s4/core/Key.java | 52 +++
.../src/main/java/org/apache/s4/core/Main.java | 194 +++++++-----
.../java/org/apache/s4/core/ProcessingElement.java | 129 ++++----
.../src/main/java/org/apache/s4/core/Receiver.java | 24 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 28 ++
.../java/org/apache/s4/core/RemoteSenders.java | 62 ++++
.../main/java/org/apache/s4/core/RemoteStream.java | 77 +++++
.../src/main/java/org/apache/s4/core/Sender.java | 18 +-
.../src/main/java/org/apache/s4/core/Server.java | 84 +++--
.../src/main/java/org/apache/s4/core/Stream.java | 133 ++++-----
.../main/java/org/apache/s4/core/Streamable.java | 18 +-
.../main/java/org/apache/s4/core/WindowingPE.java | 23 +-
.../org/apache/s4/core/adapter/AdapterApp.java | 52 +++
.../s4/core/util/ParametersInjectionModule.java | 28 ++
.../s4/deploy/DistributedDeploymentManager.java | 6 +-
.../src/main/resources/apps/CounterExample.s4r | Bin 53071 -> 0 bytes
.../src/main/resources/default.s4.core.properties | 2 +
.../s4-core/src/main/resources/s4-core.properties | 5 -
.../test/java/org/apache/s4/core/TriggerTest.java | 20 +-
.../apache/s4/core/triggers/TriggeredModule.java | 6 -
.../apache/s4/deploy/TestAutomaticDeployment.java | 116 +++-----
.../test/java/org/apache/s4/deploy/TestModule.java | 5 +-
.../s4/deploy/prodcon/TestProducerConsumer.java | 165 ++++++++++
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 85 +++---
.../java/org/apache/s4/fixtures/SocketAdapter.java | 27 +-
.../org/apache/s4/fixtures/ZkBasedAppModule.java | 35 --
.../org/apache/s4/wordcount/WordClassifierPE.java | 19 +-
.../org/apache/s4/wordcount/WordCountModule.java | 10 +-
.../org/apache/s4/wordcount/WordCountTest.java | 10 +-
.../apache/s4/wordcount/zk/WordCountModuleZk.java | 7 -
.../apache/s4/wordcount/zk/WordCountTestZk.java | 46 ---
.../src/test/resources/default.s4.properties | 10 -
.../resources/org.apache.s4.deploy.s4.properties | 15 -
.../test/resources/s4-counter-example.properties | 7 -
subprojects/s4-edsl/s4-edsl.gradle | 2 +-
.../main/java/org/apache/s4/edsl/AppBuilder.java | 4 +-
.../src/test/java/org/apache/s4/edsl/Module.java | 105 -------
.../src/test/java/org/apache/s4/edsl/MyApp.java | 13 -
.../src/test/java/org/apache/s4/edsl/TestEDSL.java | 12 +-
.../java/org/apache/s4/example/counter/Module.java | 81 ++---
.../java/org/apache/s4/example/counter/MyApp.java | 46 +--
.../apache/s4/example/edsl/counter/CounterApp.java | 199 ++++++------
.../org/apache/s4/example/edsl/counter/Module.java | 105 -------
.../java/org/apache/s4/example/model/Main.java | 18 +-
.../java/org/apache/s4/example/model/MyApp.java | 12 +-
subprojects/s4-tools/s4-tools.gradle | 73 +++++
.../main/java/org/apache/s4/tools/CreateApp.java | 146 +++++++++
.../java/org/apache/s4/tools/DefineCluster.java | 48 +++
.../src/main/java/org/apache/s4/tools/Deploy.java | 151 +++++++++
.../org/apache/s4/tools/FileExistsValidator.java | 18 +
.../src/main/java/org/apache/s4/tools/Package.java | 44 +++
.../main/java/org/apache/s4/tools/S4ArgsBase.java | 33 ++
.../src/main/java/org/apache/s4/tools/Tools.java | 113 +++++++
.../main/java/org/apache/s4/tools/ZKServer.java | 66 ++++
.../src/main/resources/templates/HelloApp.java.txt | 34 ++
.../resources/templates/HelloInputAdapter.java.txt | 60 ++++
.../src/main/resources/templates/HelloPE.java.txt | 29 ++
.../src/main/resources/templates/build.gradle | 175 +++++++++++
.../s4-tools/src/main/resources/templates/gradlew | 2 +
.../src/main/resources/templates/newApp.README | 37 +++
.../s4-tools/src/main/resources/templates/s4 | 17 +
.../src/main/resources/templates/settings.gradle | 1 +
test-apps/s4-counter/build.gradle | 224 ++++++++++++++
.../s4-counter/src/main/java/s4app/ClockApp.java | 38 +++
.../s4-counter/src/main/java/s4app/ClockPE.java | 49 +++
test-apps/s4-showtime/build.gradle | 222 +++++++++++++
.../s4-showtime/src/main/java/s4app/ShowPE.java | 33 ++
.../src/main/java/s4app/ShowTimeApp.java | 29 ++
test-apps/simple-deployable-app-1/build.gradle | 33 +-
.../main/java/org/apache/s4/deploy/SimplePE.java | 2 +-
.../main/java/org/apache/s4/deploy/TestApp.java | 2 +-
test-apps/simple-deployable-app-2/build.gradle | 33 +-
.../main/java/org/apache/s4/deploy/TestApp.java | 2 +-
test-apps/twitter-adapter/README.txt | 1 +
test-apps/twitter-adapter/build.gradle | 209 +++++++++++++
.../s4/example/twitter/TwitterInputAdapter.java | 120 +++++++
.../src/main/resources/s4.properties | 18 +
test-apps/twitter-counter/README.txt | 33 ++
test-apps/twitter-counter/build.gradle | 201 ++++++++++++
.../org/apache/s4/example/twitter/TopNTopicPE.java | 96 ++++++
.../s4/example/twitter/TopicCountAndReportPE.java | 56 ++++
.../org/apache/s4/example/twitter/TopicEvent.java | 30 ++
.../s4/example/twitter/TopicExtractorPE.java | 59 ++++
.../s4/example/twitter/TwitterCounterApp.java | 72 +++++
.../src/main/resources/default.s4.properties | 19 ++
163 files changed, 5759 insertions(+), 2540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/build.gradle
----------------------------------------------------------------------
diff --cc build.gradle
index 818b968,4639a8a..54c70ee
--- a/build.gradle
+++ b/build.gradle
@@@ -142,21 -154,21 +144,7 @@@ dependencies
platformLibs platformProjects
}
--//task xxx {
--// platformProjects.collect {proj ->
--// // Iterate over the source sets
--// println '\n' + proj.name + ' ' + proj.sourceSets.main.name
--// println proj.sourceSets.main.classesDir
--// println proj.sourceSets.main.runtimeClasspath.getFiles()
--// println proj.sourceSets.main.resources.getFiles()
--// println proj.sourceSets.main.allJava.getFiles()
--// println proj.configurations.runtime.getFiles()
--// println ' '
--// println proj.configurations.archives.allArtifactFiles.getFiles()
--// }
--//}
--
- binDistImage = copySpec {
+ project.ext["binDistImage"] = copySpec {
platformProjects.collect {proj ->
into ("platform/lib") {
from proj.sourceSets.main.resources
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/settings.gradle
----------------------------------------------------------------------
diff --cc settings.gradle
index 71b1207,33827ba..ec6cb7d
--- a/settings.gradle
+++ b/settings.gradle
@@@ -16,10 -16,14 +16,15 @@@
include 's4-base'
include 's4-core'
include 's4-comm'
+include 's4-edsl'
include 's4-example'
+ include 's4-tools'
+ //include 's4-example'
+ //include ':test-apps:simple-adapter-1'
include ':test-apps:simple-deployable-app-1'
include ':test-apps:simple-deployable-app-2'
+ include ':test-apps:s4-showtime'
+ include ':test-apps:s4-counter'
rootProject.name = 's4'
rootProject.children.each {project ->
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
index ad511c9,08a291d..54c08df
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java
@@@ -9,9 -9,7 +9,9 @@@ public interface Emitter
*
* @return - true - if message is sent across successfully - false - if send fails
*/
- boolean send(int partitionId, byte[] message);
+ boolean send(int partitionId, EventMessage message);
int getPartitionCount();
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/s4-comm.gradle
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
index 7218afe,7fd6773..3dd2955
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingEmitter.java
@@@ -94,10 -96,4 +95,9 @@@ public class QueueingEmitter implement
}
}
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/loopback/LoopBackEmitter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 171315a,7bff4af..5caafc9
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@@ -36,303 -32,119 +38,309 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
/**
+ * <p>
+ * TCPEmitter - Uses TCP to send messages across partitions. It
+ * <ul>
+ * <li>guarantees message delivery</li>
+ * <li>preserves pair-wise message ordering; might end up sending duplicates to ensure the order</li>
+ * <li>tolerates topology changes, partition re-mapping and network glitches</li>
+ * </ul>
+ * </p>
*
- * Sends messages through TCP, to the associated subcluster.
- *
+ * <p>
+ * TCPEmitter is designed as follows:
+ * <ul>
+ * <li>maintains per-partition queue of {@code Message}s</li>
+ * <li> <code>send(p, m)</code> queues the message 'm' to partition 'p'</li>
+ * <li>a thread-pool is used to send the messages asynchronously to the appropriate partitions; send operations between
+ * a pair of partitions are serialized</li>
+ * <li>Each {@code Message} implements the {@link ChannelFutureListener} and listens on the {@link ChannelFuture}
+ * corresponding to the send operation</li>
+ * <li>On success, the message marks itself as sent; messages marked sent at the head of the queue are removed</li>
+ * <li>On failure of a message m, 'm' and all the messages queued after 'm' are resent to preserve message ordering</li>
+ * </ul>
+ * </p>
*/
-public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChangeListener {
+
- public class TCPEmitter implements Emitter, TopologyChangeListener {
++public class TCPEmitter implements Emitter, ClusterChangeListener {
private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
- private static final int BUFFER_SIZE = 10;
- private static final int NUM_RETRIES = 10;
+
+ private final int numRetries;
+ private final int retryDelayMs;
+ private final int nettyTimeout;
+ private final int bufferCapacity;
- private Topology topology;
+ private Cluster topology;
private final ClientBootstrap bootstrap;
+ /*
+ * debug information
+ */
+ private volatile int instanceId = 0;
+
+ /*
+ * All channels
+ */
+ private final ChannelGroup channels = new DefaultChannelGroup();
+
+ /*
+ * Channel used to send messages to each partition
+ */
+ private final HashBiMap<Integer, Channel> partitionChannelMap;
+
+ /*
+ * Node hosting each partition
+ */
+ private final HashBiMap<Integer, ClusterNode> partitionNodeMap;
+
+ /*
+ * Messages to be sent, stored per partition
+ */
+ private final Hashtable<Integer, SendQueue> sendQueues;
+
+ /*
+ * Thread pool to actually send messages
+ */
+ private final ExecutorService sendService;
+
@Inject
- public TCPEmitter(Topology topology, @Named("tcp.partition.queue_size") int bufferSize,
+ SerializerDeserializer serDeser;
+
- static class MessageQueuesPerPartition {
- private Hashtable<Integer, Queue<byte[]>> queues = new Hashtable<Integer, Queue<byte[]>>();
- private boolean bounded;
++ @Inject
++ public TCPEmitter(Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
+ @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
+ @Named("comm.timeout") int timeout) throws InterruptedException {
+ this.numRetries = retries;
+ this.retryDelayMs = retryDelay;
+ this.nettyTimeout = timeout;
+ this.bufferCapacity = bufferSize;
+ this.topology = topology;
+ this.topology.addListener(this);
- MessageQueuesPerPartition(boolean bounded) {
- this.bounded = bounded;
+ // Initialize data structures
- int clusterSize = this.topology.getTopology().getNodes().size();
++ int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+ partitionChannelMap = HashBiMap.create(clusterSize);
+ partitionNodeMap = HashBiMap.create(clusterSize);
+ sendQueues = new Hashtable<Integer, SendQueue>(clusterSize);
+
+ // Initialize sendService
+ int numCores = Runtime.getRuntime().availableProcessors();
+ sendService = Executors.newFixedThreadPool(2 * numCores,
+ new ThreadFactoryBuilder().setNameFormat("TCPEmitterSendServiceThread-#" + instanceId++ + "-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread paramThread, Throwable paramThrowable) {
+ logger.error("Cannot send message", paramThrowable);
+ }
+ }).build());
+
+ // Initialize netty related structures
+ ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ bootstrap = new ClientBootstrap(factory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline p = Channels.pipeline();
+ p.addLast("1", new LengthFieldPrepender(4));
+ p.addLast("2", new NotifyChannelInterestChange());
+ return p;
+ }
+ });
+
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", this.nettyTimeout);
+ }
+
+ private class Message implements ChannelFutureListener {
+ private final SendQueue sendQ;
+ private final byte[] message;
+ private boolean sendSuccess = false;
++ @Inject
++ SerializerDeserializer serDeser;
+
+ Message(SendQueue sendQ, byte[] message) {
+ this.sendQ = sendQ;
+ this.message = message;
}
- private boolean add(int partitionId, byte[] message) {
- Queue<byte[]> messages = queues.get(partitionId);
+ private void sendMessage() {
+ sendQ.emitter.sendMessage(sendQ.partitionId, this);
+ }
- if (messages == null) {
- messages = new ArrayDeque<byte[]>();
- queues.put(partitionId, messages);
+ private void messageSendFailure() {
+ logger.debug("Message send to partition {} has failed", sendQ.partitionId);
+ synchronized (sendQ.failureFound) {
+ sendQ.failureFound = true;
}
+ removeChannel(sendQ.partitionId);
+ sendQ.spawnSendTask();
+ }
- if (bounded && messages.size() >= BUFFER_SIZE) {
- // Too many messages already queued
- return false;
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ sendSuccess = true;
+ sendQ.clearWire();
+ return;
}
- messages.offer(message);
- return true;
+ if (future.isCancelled()) {
+ logger.error("Send I/O cancelled to " + future.getChannel().getRemoteAddress());
+ }
+
+ // failed operation
+ messageSendFailure();
}
+ }
- private byte[] peek(int partitionId) {
- Queue<byte[]> messages = queues.get(partitionId);
+ private class SendQueue {
+ private final TCPEmitter emitter;
+ private final int partitionId;
+ private final int bufferCapacity;
+ private final Queue<Message> pending; // messages to be sent
+ private final Queue<Message> wire; // messages in transit
+
+ private Integer bufferSize = 0;
+ private Boolean sending = false;
+ private Boolean failureFound = false;
+ private Boolean newMessages = false;
+
+ SendQueue(TCPEmitter emitter, int partitionId, int bufferCapacity) {
+ this.emitter = emitter;
+ this.partitionId = partitionId;
+ this.bufferCapacity = bufferCapacity;
+ this.pending = new ConcurrentLinkedQueue<Message>();
+ this.wire = new ConcurrentLinkedQueue<Message>();
+ }
- try {
- return messages.peek();
- } catch (NullPointerException npe) {
- return null;
+ private boolean lock() {
+ if (sending)
+ return false;
- }
+ sending = true;
+ return true;
}
- private void remove(int partitionId) {
- Queue<byte[]> messages = queues.get(partitionId);
+ private void unlock() {
+ sending = false;
+ }
- if (messages.isEmpty()) {
- logger.error("Trying to remove messages from an empty queue for partition" + partitionId);
- return;
+ private boolean offer(byte[] message) {
+ Message m = new Message(this, message);
+ synchronized (bufferSize) {
+ if (bufferSize >= bufferCapacity) {
+ return false;
+ }
+ bufferSize++;
}
- if (messages != null)
- messages.remove();
+ pending.add(m);
+ spawnSendTask();
+ return true;
+
}
- }
- private HashBiMap<Integer, Channel> partitionChannelMap;
- private HashBiMap<Integer, ClusterNode> partitionNodeMap;
- private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
+ public void clearWire() {
+ while (!wire.isEmpty()) {
+ Message msg = wire.peek();
+ if (!msg.sendSuccess)
+ return;
+ wire.remove();
+ synchronized (bufferSize) {
+ bufferSize--;
+ }
+ }
+ }
- @Inject
- public TCPEmitter(Cluster topology) throws InterruptedException {
- this.topology = topology;
- topology.addListener(this);
- int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+ private void spawnSendTask() {
+ // Lock before spawning a new SendTask
+ boolean acquired = lock();
+ if (acquired) {
+ try {
+ emitter.sendService.execute(new SendTask(this));
+ } finally {
+ unlock();
+ }
+ } else {
+ synchronized (newMessages) {
+ newMessages = true;
+ }
+ }
+ }
- partitionChannelMap = HashBiMap.create(clusterSize);
- partitionNodeMap = HashBiMap.create(clusterSize);
+ private void resendWiredMessages() {
+ clearWire();
+ for (Message msg : wire) {
+ msg.sendMessage();
+ }
+ }
- ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
+ private void sendPendingMessages() {
+ Message msg = null;
+ while ((msg = pending.poll()) != null) {
+ msg.sendMessage();
+ wire.add(msg);
+ }
+ }
- bootstrap = new ClientBootstrap(factory);
+ private void sendMessages() {
+ while (true) {
+ boolean resend = false;
+ synchronized (failureFound) {
+ if (failureFound) {
+ resend = true;
+ failureFound = false;
+ } else
+ break;
+ }
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("Framing", new LengthFieldPrepender(4));
- p.addLast("ChannelStateMonitor", new ChannelStateMonitoringHandler());
- return p;
+ if (resend)
+ resendWiredMessages();
}
- });
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
+ while (true) {
+ sendPendingMessages();
+ synchronized (newMessages) {
+ if (newMessages) {
+ newMessages = false;
+ continue;
+ } else {
+ unlock();
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private class SendTask implements Runnable {
+ private final SendQueue sendQ;
+
+ SendTask(SendQueue sendQ) {
+ this.sendQ = sendQ;
+ }
+
+ @Override
+ public void run() {
+ sendQ.sendMessages();
+ }
}
private boolean connectTo(Integer partitionId) {
ClusterNode clusterNode = partitionNodeMap.get(partitionId);
if (clusterNode == null) {
- if (topology.getPhysicalCluster().getNodes().size() == 0) {
- logger.error("No node in cluster ");
- return false;
- }
- clusterNode = topology.getPhysicalCluster().getNodes().get(partitionId);
- partitionNodeMap.forcePut(partitionId, clusterNode);
- }
+
- if (clusterNode == null) {
logger.error("No ClusterNode exists for partitionId " + partitionId);
+ onChange();
return false;
}
@@@ -389,113 -188,127 +397,133 @@@
}
}
- // /*
- // * Try limiting the size of the send queue inside Netty
-
- // FIXME this does not work: in case of a disconnection, the channel remains non writeable, and the lock is
- // never released, hence blocking.
- // should be fixed using: 1/ configurable timeouts (should drop pending messages after the timeout) 2/ make sure
- // the handler is correctly responding to disconnections/reconnections 3/ there should be a lock per channel,
- // no?
-
- // NOTE: might be fixed in S4-7
-
- // */
- // if (!channel.isWritable()) {
- // synchronized (sendLock) {
- // // check again now that we have the lock
- // while (!channel.isWritable()) {
- // try {
- // sendLock.wait();
- // } catch (InterruptedException ie) {
- // return false;
- // }
- // }
- // }
- // }
-
- // /*
- // * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
- // * buffered messages, and (3) the Current Message
- // *
- // * Once the channel returns success delete from the messagesOnTheWire
- // */
- // byte[] messageBeingSent = null;
- // // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
- // // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
- // // }
- //
- // while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
- // writeMessageToChannel(channel, partitionId, messageBeingSent);
- // queuedMessages.remove(partitionId);
- // }
-
- writeMessageToChannel(channel, partitionId, serDeser.serialize(message));
- return true;
+ if (!messageSent) {
+ m.messageSendFailure();
+ }
+
}
@Override
- public boolean send(int partitionId, byte[] message) {
- public void operationComplete(ChannelFuture f) {
- int partitionId = partitionChannelMap.inverse().get(f.getChannel());
- if (f.isSuccess()) {
- // messagesOnTheWire.remove(partitionId);
++ public boolean send(int partitionId, EventMessage message) {
+ if (!sendQueues.containsKey(partitionId)) {
+ SendQueue sendQueue = new SendQueue(this, partitionId, this.bufferCapacity);
+ sendQueues.put(partitionId, sendQueue);
}
- return sendQueues.get(partitionId).offer(message);
- if (f.isCancelled()) {
- logger.error("Send I/O was cancelled! " + f.getChannel().getRemoteAddress());
- } else if (!f.isSuccess()) {
- logger.error("Exception on I/O operation", f.getCause());
- logger.error(String.format("I/O on partition %d failed!", partitionId));
- partitionChannelMap.remove(partitionId);
++ return sendQueues.get(partitionId).offer(serDeser.serialize(message));
+ }
+
+ protected void removeChannel(int partition) {
+ Channel c = partitionChannelMap.remove(partition);
+ if (c == null)
+ return;
+
+ c.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess())
+ channels.remove(future.getChannel());
+ else
+ logger.error("FAILED to close channel");
+ }
+ });
+ }
+
+ public void close() {
+ for (SendQueue sendQ : sendQueues.values()) {
+ if (!sendQ.wire.isEmpty()) {
+ logger.error("TCPEmitter could not deliver {} messages to partition {}", sendQ.wire.size(),
+ sendQ.partitionId);
+ sendQ.wire.clear();
+ }
+
+ if (!sendQ.pending.isEmpty()) {
+ logger.error("TCPEmitter could not send {} messages to partition {}", sendQ.pending.size(),
+ sendQ.partitionId);
+ sendQ.pending.clear();
+ }
+ }
+
+ try {
+ channels.close().await();
+ bootstrap.releaseExternalResources();
+ } catch (InterruptedException ie) {
+ logger.error("Interrupted while closing");
+ ie.printStackTrace();
}
}
@Override
public void onChange() {
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
- /*
- * Close the channels that correspond to changed partitions and update partitionNodeMap
- */
+ for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
- ClusterNode oldNode = partitionNodeMap.get(partition);
+ if (partition == null) {
+ logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+ return;
+ }
+ ClusterNode oldNode = partitionNodeMap.remove(partition);
if (oldNode != null && !oldNode.equals(clusterNode)) {
- if (partitionChannelMap.containsKey(partition)) {
- partitionChannelMap.remove(partition).close();
- }
+ removeChannel(partition);
}
-
partitionNodeMap.forcePut(partition, clusterNode);
}
}
@Override
public int getPartitionCount() {
- return topology.getTopology().getPartitionCount();
- // Number of nodes is not same as number of partitions
+ return topology.getPhysicalCluster().getPartitionCount();
}
- class ChannelStateMonitoringHandler extends SimpleChannelHandler {
+ class NotifyChannelInterestChange extends SimpleChannelHandler {
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
- // logger.info(String.format("%08x %08x %08x", e.getValue(),
- // e.getChannel().getInterestOps(), Channel.OP_WRITE));
- // FIXME see above comments about fixing the buffering of messages
- // synchronized (sendLock) {
- // if (e.getChannel().isWritable()) {
- // sendLock.notify();
- // }
- // }
+ Channel c = e.getChannel();
+ Integer partitionId = partitionChannelMap.inverse().get(c);
+ if (partitionId == null) {
+ logger.debug("channelInterestChanged for an unknown/deleted channel");
+ return;
+ }
+
+ SendQueue sendQ = sendQueues.get(partitionId);
+ synchronized (sendQ) {
+ if (c.isWritable()) {
+ sendQ.notify();
+ }
+ }
++
ctx.sendUpstream(e);
}
@Override
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
- Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
- String target;
- if (partitionId == null) {
- target = "unknown channel";
- } else {
- target = "channel for partition [" + partitionId + "], target node host ["
- + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
- + partitionNodeMap.get(partitionId).getPort() + "]";
- }
- logger.error(
- "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
- target);
+ try {
+ throw event.getCause();
+ } catch (ClosedChannelException cce) {
+ return;
+ } catch (ConnectException ce) {
+ return;
+ } catch (Throwable e) {
+ e.printStackTrace();
++ // Integer partitionId = partitionChannelMap.inverse().get(context.getChannel());
++ // String target;
++ // if (partitionId == null) {
++ // target = "unknown channel";
++ // } else {
++ // target = "channel for partition [" + partitionId + "], target node host ["
++ // + partitionNodeMap.get(partitionId).getMachineName() + "], target node port ["
++ // + partitionNodeMap.get(partitionId).getPort() + "]";
++ // }
++ // logger.error(
++ // "Error on [{}]. This can be due to a disconnection of the receiver node. Channel will be closed.",
++ // target);
++ //
++ // if (context.getChannel().isOpen()) {
++ // logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
++ // context.getChannel().close();
++ // }
+
- if (context.getChannel().isOpen()) {
- logger.info("Closing channel [{}] due to exception [{}]", target, event.getCause().getMessage());
- context.getChannel().close();
}
-
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index af019c0,58348af..2aa14f0
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@@ -29,21 -25,20 +30,25 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
+import com.google.inject.name.Named;
+ /**
+ * Receives messages through TCP for the assigned subcluster.
+ *
+ */
public class TCPListener implements Listener {
+ private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
private ClusterNode node;
- private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
+ private ServerBootstrap bootstrap;
+ private final ChannelGroup channels = new DefaultChannelGroup();
+ private final int nettyTimeout;
@Inject
- public TCPListener(Assignment assignment) {
+ public TCPListener(Assignment assignment, @Named("comm.timeout") int timeout) {
// wait for an assignment
node = assignment.assignClusterNode();
+ nettyTimeout = timeout;
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
@@@ -112,16 -92,17 +117,23 @@@
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
logger.error("Error", event.getCause());
- if (context.getChannel().isOpen()) {
- logger.error("Closing channel due to exception");
- context.getChannel().close();
- }
+ Channel c = context.getChannel();
+ c.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess())
+ channels.remove(future.getChannel());
+ else
+ logger.error("FAILED to close channel");
+ }
+ });
}
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // TODO Auto-generated method stub
+ super.channelClosed(ctx, e);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index 0000000,7dfa2c8..3fd5bcf
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@@ -1,0 -1,24 +1,27 @@@
+ package org.apache.s4.comm.tcp;
+
+ import org.apache.s4.base.RemoteEmitter;
+ import org.apache.s4.comm.topology.Cluster;
+
+ import com.google.inject.Inject;
+ import com.google.inject.assistedinject.Assisted;
++import com.google.inject.name.Named;
+
+ /**
+ * Emitter to remote subclusters.
+ *
+ */
+ public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
+
+ /**
+ * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
+ * discovered (as remote streams outputs)
+ */
+ @Inject
- public TCPRemoteEmitter(@Assisted Cluster topology) throws InterruptedException {
- super(topology);
++ public TCPRemoteEmitter(@Assisted Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
++ @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
++ @Named("comm.timeout") int timeout) throws InterruptedException {
++ super(topology, bufferSize, retries, retryDelay, timeout);
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index 17f4037,6813f3f..a9066a4
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@@ -1,143 -1,13 +1,14 @@@
package org.apache.s4.comm.topology;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- import java.util.Set;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import com.google.common.collect.HashBiMap;
- import com.google.inject.Inject;
- import com.google.inject.Singleton;
- import com.google.inject.name.Named;
-
/**
- *
- * The S4 physical cluster implementation.
+ * Represents a logical cluster
*
*/
- @Singleton
- public class Cluster {
-
- // TODO: do we need a Cluster interface to represent different types of
- // implementations?
-
- private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
-
- // List<ClusterNode> nodes = new ArrayList<ClusterNode>();
- HashBiMap<Integer, ClusterNode> nodes = HashBiMap.create();
- String mode = "unicast";
- String name = "unknown";
-
- final private String[] hosts;
- final private String[] ports;
- final private int numNodes;
- private int numPartitions;
-
- public Cluster(int numPartitions) {
- this.hosts = new String[] {};
- this.ports = new String[] {};
- this.numNodes = 0;
- this.numPartitions = numPartitions;
- }
-
- /**
- * Define the hosts and corresponding ports in the cluster.
- *
- * @param hosts
- * a comma separates list of host names.
- * @param ports
- * a comma separated list of ports.
- * @throws IOException
- * if number of hosts and ports don't match.
- */
- @Inject
- Cluster(@Named("cluster.hosts") String hosts, @Named("cluster.ports") String ports) throws IOException {
- if (hosts != null && hosts.length() > 0 && ports != null && ports.length() > 0) {
- this.ports = ports.split(",");
- this.hosts = hosts.split(",");
-
- if (this.ports.length != this.hosts.length) {
- logger.error("Number of hosts should match number of ports in properties file. hosts: " + hosts
- + " ports: " + ports);
- throw new IOException();
- }
-
- numNodes = this.hosts.length;
- for (int i = 0; i < numNodes; i++) {
- ClusterNode node = new ClusterNode(i, Integer.parseInt(this.ports[i]), this.hosts[i], "");
- addNode(node);
- logger.info("Added cluster node: " + this.hosts[i] + ":" + this.ports[i]);
- }
- numPartitions = numNodes;
- } else {
- this.hosts = new String[] {};
- this.ports = new String[] {};
- this.numNodes = 0;
-
- }
- }
-
- /**
- * Number of partitions in the cluster.
- *
- * @return
- */
- public int getPartitionCount() {
- return numPartitions;
- }
-
- /**
- * @param node
- */
- public void addNode(ClusterNode node) {
- // nodes.add(node);
- if (nodes.containsKey(node.getPartition())) {
- logger.debug("Partition {} already has a corresponding ClusterNode", node.getPartition());
- }
- if (nodes.containsValue(node)) {
- logger.error("ClusterNode is already mapped to partition {}", nodes.inverse().get(node));
- }
- nodes.forcePut(node.getPartition(), node);
- }
-
- /**
- * @return a list of {@link ClusterNode} objects available in the cluster.
- */
- public Set<ClusterNode> getNodes() {
- return Collections.unmodifiableSet(nodes.values());
- }
-
- // TODO: do we need mode and name? Making provate for now.
-
- @SuppressWarnings("unused")
- private String getMode() {
- return mode;
- }
-
- @SuppressWarnings("unused")
- private void setMode(String mode) {
- this.mode = mode;
- }
-
- @SuppressWarnings("unused")
- private String getName() {
- return name;
- }
+
- @SuppressWarnings("unused")
- private void setName(String name) {
- this.name = name;
- }
+ public interface Cluster {
+ public PhysicalCluster getPhysicalCluster();
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("{name=").append(name).append(",mode=").append(mode).append(",type=").append(",nodes=").append(nodes)
- .append("}");
- return sb.toString();
- }
+ public void addListener(ClusterChangeListener listener);
+ public void removeListener(ClusterChangeListener listener);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index 3d5a199,8fab1a6..dc700ea
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@@ -9,10 -9,11 +9,12 @@@ import java.util.HashMap
import java.util.Map;
import org.apache.s4.base.Emitter;
+ import org.apache.s4.base.EventMessage;
+ import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.topology.ClusterNode;
+ import org.apache.s4.comm.topology.Cluster;
+ import org.apache.s4.comm.topology.ClusterChangeListener;
+import org.apache.s4.comm.topology.ClusterNode;
- import org.apache.s4.comm.topology.Topology;
- import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
@@@ -47,14 -49,12 +50,15 @@@ public class UDPEmitter implements Emit
}
@Override
- public boolean send(int partitionId, byte[] message) {
+ public boolean send(int partitionId, EventMessage eventMessage) {
try {
+ byte[] message = serDeser.serialize(eventMessage);
ClusterNode node = nodes.get(partitionId);
if (node == null) {
- throw new RuntimeException(String.format("Bad partition id %d", partitionId));
+ LoggerFactory.getLogger(getClass()).error(
+ "Cannot send message to partition {} because this partition is not visible to this emitter",
+ partitionId);
+ return false;
}
byte[] byteBuffer = new byte[message.length];
System.arraycopy(message, 0, byteBuffer, 0, message.length);
@@@ -81,9 -81,9 +85,9 @@@
public void onChange() {
// topology changes when processes pick tasks
synchronized (nodes) {
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+ for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
- nodes.put(partition, clusterNode);
+ nodes.forcePut(partition, clusterNode);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 0000000,07497b2..0c5bbfd
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@@ -1,0 -1,8 +1,12 @@@
+ comm.queue_emmiter_size = 8000
+ comm.queue_listener_size = 8000
+ comm.emitter.class=org.apache.s4.comm.tcp.TCPEmitter
+ comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
+ comm.listener.class=org.apache.s4.comm.tcp.TCPListener
++comm.retries=10
++comm.retry_delay=10
++comm.timeout=1000
++tcp.partition.queue_size=256
+ cluster.zk_address = localhost:2181
+ cluster.zk_session_timeout = 10000
+ cluster.zk_connection_timeout = 10000
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
index 8e94c44,0000000..c0e811e
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
@@@ -1,10 -1,0 +1,16 @@@
+package org.apache.s4.comm.tcp;
+
++import java.io.IOException;
++
++import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
- public class MultiPartitionDeliveryTest extends TCPBasedTest {
- public MultiPartitionDeliveryTest() {
++public class MultiPartitionDeliveryTest extends TCPCommTest {
++
++ private static Logger logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
++
++ public MultiPartitionDeliveryTest() throws IOException {
+ super(6);
+ logger = LoggerFactory.getLogger(MultiPartitionDeliveryTest.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
index db9b596,0000000..50ae056
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/NetworkGlitchTest.java
@@@ -1,33 -1,0 +1,38 @@@
+package org.apache.s4.comm.tcp;
+
++import java.io.IOException;
++
+import org.apache.s4.comm.util.PartitionInfo;
+import org.junit.Assert;
++import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
- public class NetworkGlitchTest extends TCPBasedTest {
- public NetworkGlitchTest() {
++public class NetworkGlitchTest extends TCPCommTest {
++
++ private static Logger logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
++
++ public NetworkGlitchTest() throws IOException {
+ super(2);
+ logger = LoggerFactory.getLogger(NetworkGlitchTest.class);
+ }
+
- @Override
+ public void testDelivery() throws InterruptedException {
+ PartitionInfo util = partitions[0];
+
+ startThreads();
+
+ for (int i = 0; i < 4; i++) {
+ Thread.sleep(500);
+ logger.debug("Messages sent so far - {}", util.sendThread.sendCounts);
+ ((TCPEmitter) util.emitter).removeChannel(0);
+ logger.debug("Channel closed");
+ }
+
+ waitForThreads();
+
+ Assert.assertTrue("Message delivery", messageDelivery());
+
+ logger.info("Message ordering - " + messageOrdering());
+ Assert.assertTrue("Pairwise message ordering", messageOrdering());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
index 64e5c73,0000000..f5d1fc0
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
@@@ -1,10 -1,0 +1,16 @@@
+package org.apache.s4.comm.tcp;
+
++import java.io.IOException;
++
++import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
- public class SimpleDeliveryTest extends TCPBasedTest {
- public SimpleDeliveryTest() {
++public class SimpleDeliveryTest extends TCPCommTest {
++
++ private static Logger logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
++
++ public SimpleDeliveryTest() throws IOException {
+ super();
+ logger = LoggerFactory.getLogger(SimpleDeliveryTest.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index 0000000,268715b..a65fca4
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@@ -1,0 -1,72 +1,64 @@@
+ package org.apache.s4.comm.tcp;
+
+ import java.io.IOException;
+
+ import org.apache.s4.comm.DefaultCommModule;
+ import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
++import org.apache.s4.comm.util.ProtocolTestUtil;
+ import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+
+ import com.google.common.io.Resources;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.Guice;
+ import com.google.inject.Injector;
+ import com.google.inject.name.Names;
+
-public class TCPCommTest extends ZkBasedTest {
++public abstract class TCPCommTest extends ProtocolTestUtil {
++
++ private static Logger logger = LoggerFactory.getLogger(TCPCommTest.class);
+ DeliveryTestUtil util;
+ public final static String CLUSTER_NAME = "cluster1";
++ Injector injector;
++
++ public TCPCommTest() throws IOException {
++ super();
++ injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
++ .openStream(), CLUSTER_NAME), new TCPCommTestModule());
++ }
++
++ public TCPCommTest(int numTasks) throws IOException {
++ super(numTasks);
++ injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
++ .openStream(), CLUSTER_NAME), new TCPCommTestModule());
++ }
+
- @Before
- public void setup() throws IOException, InterruptedException, KeeperException {
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
- new TCPCommTestModule());
- util = injector.getInstance(DeliveryTestUtil.class);
++ public Injector getInjector() {
++ return injector;
+ }
+
+ class TCPCommTestModule extends AbstractModule {
+ TCPCommTestModule() {
+
+ }
+
+ @Override
+ protected void configure() {
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+ bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+ }
+ }
+
- /**
- * Tests the protocol. If all components function without throwing exceptions, the test passes. The test also
- * reports the loss of messages, if any.
- *
- * @throws InterruptedException
- */
- @Test
- public void testTCPDelivery() throws InterruptedException {
- try {
- Thread sendThread = util.newSendThread();
- Thread receiveThread = util.newReceiveThread();
-
- // start send and receive threads
- sendThread.start();
- receiveThread.start();
++ @Override
++ public void testDelivery() throws InterruptedException {
++ startThreads();
++ waitForThreads();
+
- // wait for them to finish
- sendThread.join();
- receiveThread.join();
-
- Assert.assertTrue("Guaranteed message delivery", !util.moreMessages(receiveThread));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("TCP has failed basic functionality test");
- }
++ Assert.assertTrue("Message Delivery", messageDelivery());
+
- System.out.println("Done");
++ logger.info("Message ordering - " + messageOrdering());
++ Assert.assertTrue("Pairwise message ordering", messageOrdering());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
index 0559b41,0000000..ad030c0
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
@@@ -1,7 -1,0 +1,9 @@@
+package org.apache.s4.comm.udp;
+
- public class MultiPartitionDeliveryTest extends UDPBasedTest {
- public MultiPartitionDeliveryTest() {
++import java.io.IOException;
++
++public class MultiPartitionDeliveryTest extends UDPCommTest {
++ public MultiPartitionDeliveryTest() throws IOException {
+ super(2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
index c29f0ca,0000000..41d3017
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
@@@ -1,7 -1,0 +1,9 @@@
+package org.apache.s4.comm.udp;
+
- public class SimpleDeliveryTest extends UDPBasedTest {
- public SimpleDeliveryTest() {
++import java.io.IOException;
++
++public class SimpleDeliveryTest extends UDPCommTest {
++ public SimpleDeliveryTest() throws IOException {
+ super();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 0000000,8f01a9b..90ebc3f
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@@ -1,0 -1,67 +1,65 @@@
+ package org.apache.s4.comm.udp;
+
+ import java.io.IOException;
+
+ import org.apache.s4.comm.DefaultCommModule;
+ import org.apache.s4.comm.DeliveryTestUtil;
-import org.apache.s4.comm.tcp.TCPCommTest;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
++import org.apache.s4.comm.util.ProtocolTestUtil;
+ import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+
+ import com.google.common.io.Resources;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.Guice;
+ import com.google.inject.Injector;
+ import com.google.inject.name.Names;
+
-public class UDPCommTest extends ZkBasedTest {
++public abstract class UDPCommTest extends ProtocolTestUtil {
+ DeliveryTestUtil util;
++ private Injector injector;
+
- @Before
- public void setup() throws IOException, InterruptedException, KeeperException {
- Injector injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
- .openStream(), TCPCommTest.CLUSTER_NAME), new UDPCommTestModule());
- util = injector.getInstance(DeliveryTestUtil.class);
++ public UDPCommTest() throws IOException {
++ super();
++ injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
++ .openStream(), "cluster1"), new UDPCommTestModule());
++ }
++
++ public UDPCommTest(int numTasks) throws IOException {
++ super(numTasks);
++ injector = Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties")
++ .openStream(), "cluster1"), new UDPCommTestModule());
++ }
++
++ @Override
++ protected Injector getInjector() throws IOException {
++ return injector;
+ }
+
+ class UDPCommTestModule extends AbstractModule {
+ UDPCommTestModule() {
+ }
+
+ @Override
+ protected void configure() {
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.interval")).toInstance(100);
+ bind(Integer.class).annotatedWith(Names.named("emitter.send.numMessages")).toInstance(200);
+ bind(Integer.class).annotatedWith(Names.named("listener.recv.sleepCount")).toInstance(10);
+ }
+ }
+
+ /**
+ * Tests the protocol. If all components function without throwing exceptions, the test passes.
+ *
+ * @throws InterruptedException
+ */
- @Test
- public void testUDPDelivery() throws InterruptedException {
++ @Override
++ public void testDelivery() {
+ try {
- Thread sendThread = util.newSendThread();
- Thread receiveThread = util.newReceiveThread();
-
- // start send and receive threads
- sendThread.start();
- receiveThread.start();
-
- // wait for them to finish
- sendThread.join();
- receiveThread.join();
-
++ Thread.sleep(1000);
++ startThreads();
++ waitForThreads();
++ Assert.assertTrue("Message Delivery", messageDelivery());
+ } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("UDP has failed basic functionality test");
++ Assert.fail("UDP DeliveryTest");
+ }
- System.out.println("Done");
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
index 91fc7ec,0000000..6078f7e
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
@@@ -1,176 -1,0 +1,183 @@@
+package org.apache.s4.comm.util;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.s4.base.Emitter;
++import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.Listener;
++import org.apache.s4.base.SerializerDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Test util for communication protocols.
+ *
+ * <ul>
+ * <li>The util defines Send and Receive Threads</li>
+ * <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
+ * <li>ReceiveThread receives all/most of these messages</li>
+ * <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would interrupt after a pre-defined but
+ * long enough interval</li>
+ * </ul>
+ *
+ */
+public class PartitionInfo {
+ private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
+ public Emitter emitter;
+ public Listener listener;
+ public SendThread sendThread;
+ public ReceiveThread receiveThread;
+
+ private final int numRetries;
+ private final int retryDelayMs;
+ private int numMessages;
+ private int partitionId;
+ private ProtocolTestUtil ptu;
+
+ @Inject
++ SerializerDeserializer serDeser;
++
++ @Inject
+ public PartitionInfo(Emitter emitter, Listener listener, @Named("comm.retries") int retries,
+ @Named("comm.retry_delay") int retryDelay, @Named("emitter.send.numMessages") int numMessages) {
+ this.emitter = emitter;
+ this.listener = listener;
+ this.partitionId = this.listener.getPartitionId();
+ logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
+ this.listener.getPartitionId());
+
+ this.numRetries = retries;
+ this.retryDelayMs = retryDelay;
+ this.numMessages = numMessages;
+ // this.messagesExpected = numMessages * this.emitter.getPartitionCount();
+
+ this.sendThread = new SendThread();
+ this.receiveThread = new ReceiveThread();
+ }
+
+ public void setProtocolTestUtil(ProtocolTestUtil ptu) {
+ this.ptu = ptu;
+ this.ptu.expectedMessages[partitionId] = numMessages * this.emitter.getPartitionCount();
+ }
+
+ public class SendThread extends Thread {
+ public int[] sendCounts = new int[emitter.getPartitionCount()];
+
+ public SendThread() {
+ super("SendThread-" + partitionId);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < numMessages; i++) {
+ for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
- byte[] message = new String(partitionId + " " + i).getBytes();
++ EventMessage message = new EventMessage("app1", "stream1",
++ new String(partitionId + " " + i).getBytes());
+ for (int retries = 0; retries < numRetries; retries++) {
+ if (emitter.send(partition, message)) {
+ sendCounts[partition]++;
+ break;
+ }
+ Thread.sleep(retryDelayMs);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+
+ for (int i = 0; i < sendCounts.length; i++) {
+ if (sendCounts[i] < numMessages) {
+ ptu.decreaseExpectedMessages(i, (numMessages - sendCounts[i]));
+ }
+ }
+
+ logger.debug("Exiting");
+ }
+ }
+
+ public class ReceiveThread extends Thread {
+ protected int messagesReceived = 0;
+ private Hashtable<Integer, List<Integer>> receivedMessages;
+
+ ReceiveThread() {
+ super("ReceiveThread-" + partitionId);
+ receivedMessages = new Hashtable<Integer, List<Integer>>();
+ }
+
+ @Override
+ public void run() {
+ while (messagesReceived < ptu.expectedMessages[partitionId]) {
+ byte[] message = listener.recv();
+ if (message == null) {
+ logger.error("ReceiveThread {}: received a null message", partitionId);
+ break;
+ }
+
++ EventMessage deserialized = (EventMessage) serDeser.deserialize(message);
+ // process and store the message
- String msgString = new String(message);
++ String msgString = new String(deserialized.getSerializedEvent());
+ String[] msgTokens = msgString.split(" ");
+ Integer senderPartition = Integer.parseInt(msgTokens[0]);
+ Integer receivedMsg = Integer.parseInt(msgTokens[1]);
+
+ if (!receivedMessages.containsKey(senderPartition)) {
+ receivedMessages.put(senderPartition, new ArrayList<Integer>());
+ }
+
+ List<Integer> messagesList = receivedMessages.get(senderPartition);
+
+ if (messagesList.contains(receivedMsg)) {
+ messagesList.remove(receivedMsg);
+ } else {
+ messagesReceived++;
+ }
+ messagesList.add(receivedMsg);
+ }
+
+ logger.debug("Exiting");
+ }
+
+ public boolean orderedDelivery() {
+ for (List<Integer> messagesList : receivedMessages.values()) {
+ int lastMsg = -1;
+ for (Integer msg : messagesList) {
+ if (msg <= lastMsg) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public boolean messageDelivery() {
+ if (messagesReceived < ptu.expectedMessages[partitionId]) {
+ printCounts();
+ return false;
+ } else
+ return true;
+ }
+
+ public void printCounts() {
+ logger.debug("ReceiveThread {}: Messages not received = {}", partitionId,
+ (ptu.expectedMessages[partitionId] - messagesReceived));
+ int counts[] = new int[emitter.getPartitionCount()];
+ for (Integer sender : receivedMessages.keySet()) {
+ counts[sender] = receivedMessages.get(sender).size();
+ }
+
+ logger.debug("ReceiveThread {}: recvdCounts: {}", partitionId, counts);
+ }
+
+ public int moreMessages() {
+ return (int) (ptu.expectedMessages[partitionId] - messagesReceived);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
index bc4f166,0000000..421aacd
mode 100644,000000..100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
@@@ -1,97 -1,0 +1,98 @@@
+package org.apache.s4.comm.util;
+
+import java.io.IOException;
+
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Injector;
+
+public abstract class ProtocolTestUtil extends ZkBasedTest {
+ protected int[] expectedMessages;
- protected Injector injector;
+ protected PartitionInfo[] partitions;
+
+ protected ProtocolTestUtil() {
+ super();
+ }
+
+ protected ProtocolTestUtil(int numTasks) {
+ super(numTasks);
+ }
+
+ @Before
- public void setup() throws IOException, InterruptedException, KeeperException {
++ public void preparePartitions() throws IOException, InterruptedException, KeeperException {
+ expectedMessages = new int[super.numTasks];
+ partitions = new PartitionInfo[super.numTasks];
+ for (int i = 0; i < this.numTasks; i++) {
- partitions[i] = injector.getInstance(PartitionInfo.class);
++ partitions[i] = getInjector().getInstance(PartitionInfo.class);
+ partitions[i].setProtocolTestUtil(this);
+ }
+ }
+
++ protected abstract Injector getInjector() throws IOException;
++
+ protected void decreaseExpectedMessages(int partition, long amount) {
+ synchronized (expectedMessages) {
+ expectedMessages[partition] -= amount;
+ }
+
+ if (partitions[partition].receiveThread.messagesReceived >= expectedMessages[partition])
+ interrupt(partition);
+ }
+
+ protected void interrupt(int partition) {
+ partitions[partition].receiveThread.interrupt();
+ }
+
+ protected void startThreads() {
+ for (PartitionInfo partition : partitions) {
+ partition.sendThread.start();
+ partition.receiveThread.start();
+ }
+ }
+
+ protected void waitForThreads() throws InterruptedException {
+ for (PartitionInfo partition : partitions) {
+ partition.sendThread.join();
+ partition.receiveThread.join();
+ }
+ }
+
+ protected boolean messageDelivery() {
+ for (PartitionInfo partition : partitions) {
+ if (!partition.receiveThread.messageDelivery())
+ return false;
+ }
+ return true;
+ }
+
+ protected boolean messageOrdering() {
+ for (PartitionInfo partition : partitions) {
+ if (!partition.receiveThread.orderedDelivery())
+ return false;
+ }
+ return true;
+ }
+
+ @After
+ public void tearDown() {
+ for (PartitionInfo partition : partitions) {
+ // debug
+ partition.receiveThread.printCounts();
+ if (partition.emitter != null) {
+ partition.emitter.close();
+ partition.emitter = null;
+ }
+ if (partition.listener != null) {
+ partition.listener.close();
+ partition.listener = null;
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public abstract void testDelivery() throws InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 3b1e45a,2f336e0..fdd607b
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@@ -13,40 -12,17 +12,27 @@@ import org.slf4j.LoggerFactory
public abstract class ZkBasedTest {
private static final Logger logger = LoggerFactory.getLogger(ZkBasedTest.class);
- private ZkServer zkServer;
- protected String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
+
- private static final String clusterName = "s4-test-cluster";
+ private Factory zkFactory;
+ protected final int numTasks;
+
+ protected ZkBasedTest() {
+ this.numTasks = 1;
+ }
+
+ protected ZkBasedTest(int numTasks) {
+ this.numTasks = numTasks;
+ }
@Before
- public void setupZkBasedTest() {
- String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
- String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+ public void prepare() throws IOException, InterruptedException, KeeperException {
CommTestUtils.cleanupTmpDirs();
- IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
- @Override
- public void createDefaultNameSpace(ZkClient zkClient) {
+ zkFactory = CommTestUtils.startZookeeperServer();
- }
- };
-
- logger.info("Starting Zookeeper Server");
- zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
- zkServer.start();
-
- TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, this.numTasks);
+ TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ taskSetup.clean("s4");
- taskSetup.setup("cluster1", 1, 1300);
++ taskSetup.setup("cluster1", numTasks, 1300);
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
----------------------------------------------------------------------
diff --cc subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
index 0000000,615f114..a5367c7
mode 000000,100644..100644
--- a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
+++ b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
@@@ -1,0 -1,9 +1,12 @@@
+ comm.queue_emmiter_size = 8000
+ comm.queue_listener_size = 8000
+ comm.emitter.class = org.apache.s4.comm.udp.UDPEmitter
+ comm.emitter.remote.class = org.apache.s4.comm.udp.UDPRemoteEmitter
+ comm.listener.class = org.apache.s4.comm.udp.UDPListener
+ cluster.name = cluster1
+ cluster.zk_address = localhost:2181
+ cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
++cluster.zk_connection_timeout = 10000
++comm.retries=10
++comm.retry_delay=10
++comm.timeout=1000
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --cc subprojects/s4-core/s4-core.gradle
index 3aa1d03,46d580d..fffab81
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@@ -19,22 -19,15 +19,15 @@@ description = 'The S4 core platform.
dependencies {
compile project(":s4-base")
compile project(":s4-comm")
- testCompile project(path: ':s4-comm', configuration: 'tests')
- }
-
- task testJar(type: Jar) {
- baseName = "test-${project.archivesBaseName}"
- from sourceSets.test.classes
- }
-
- configurations {
- tests
- }
-
- artifacts {
- tests testJar
+ compile project(path: ':s4-comm', configuration: 'tests')
+ compile libraries.jcommander
+ testCompile project(path: ':s4-comm', configuration: 'tests')
+ testCompile libraries.gradle_base_services
+ testCompile libraries.gradle_core
+ testCompile libraries.gradle_tooling_api
- testCompile libraries.gradle_wrapper
++ testCompile libraries.gradle_wrapper
}
test {
- forkEvery=1
- }
+ forkEvery=1;
-}
++}