You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/11/14 17:15:25 UTC
[2/3] Performance improvements and configurability - clearly identify
asynchronous stages and use configurable and injectable executors for each of
them (deserialization, processing,
serialization) - default executors for processing/sending use throttlin
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
deleted file mode 100644
index e744d39..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.util;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * Test util for communication protocols.
- *
- * <ul>
- * <li>The util defines Send and Receive Threads</li>
- * <li>SendThread sends out a pre-defined number of messages to all the partitions</li>
- * <li>ReceiveThread receives all/most of these messages</li>
- * <li>To avoid the receiveThread waiting for ever, it spawns a TimerThread that would interrupt after a pre-defined but
- * long enough interval</li>
- * </ul>
- *
- */
-public class PartitionInfo {
- private static final Logger logger = LoggerFactory.getLogger(PartitionInfo.class);
- public Emitter emitter;
- public Listener listener;
- public SendThread sendThread;
- public ReceiveThread receiveThread;
-
- private final int numRetries = 10;
- private final int retryDelayMs = 10;
- private int numMessages = 100;
- private int partitionId;
- private ProtocolTestUtil ptu;
-
- SerializerDeserializer serDeser;
-
- @Inject
- public PartitionInfo(Emitter emitter, Listener listener, SerializerDeserializerFactory serDeserFactory) {
- this.emitter = emitter;
- this.listener = listener;
- this.serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
- this.partitionId = this.listener.getPartitionId();
- logger.debug("# Partitions = {}; Current partition = {}", this.emitter.getPartitionCount(),
- this.listener.getPartitionId());
-
- // this.messagesExpected = numMessages * this.emitter.getPartitionCount();
-
- this.sendThread = new SendThread();
- this.receiveThread = new ReceiveThread();
- }
-
- public void setProtocolTestUtil(ProtocolTestUtil ptu) {
- this.ptu = ptu;
- this.ptu.expectedMessages[partitionId] = numMessages * this.emitter.getPartitionCount();
- }
-
- public class SendThread extends Thread {
- public int[] sendCounts = new int[emitter.getPartitionCount()];
-
- public SendThread() {
- super("SendThread-" + partitionId);
- }
-
- @Override
- public void run() {
- try {
- for (int i = 0; i < numMessages; i++) {
- for (int partition = 0; partition < emitter.getPartitionCount(); partition++) {
- for (int retries = 0; retries < numRetries; retries++) {
- if (emitter.send(partition, ByteBuffer.wrap(new String(partitionId + " " + i).getBytes()))) {
- sendCounts[partition]++;
- break;
- }
- Thread.sleep(retryDelayMs);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- return;
- }
-
- for (int i = 0; i < sendCounts.length; i++) {
- if (sendCounts[i] < numMessages) {
- ptu.decreaseExpectedMessages(i, (numMessages - sendCounts[i]));
- }
- }
-
- logger.debug("Exiting");
- }
- }
-
- public class ReceiveThread extends Thread {
- protected int messagesReceived = 0;
- private Hashtable<Integer, List<Integer>> receivedMessages;
-
- ReceiveThread() {
- super("ReceiveThread-" + partitionId);
- receivedMessages = new Hashtable<Integer, List<Integer>>();
- }
-
- @Override
- public void run() {
- while (messagesReceived < ptu.expectedMessages[partitionId]) {
- ByteBuffer message = listener.recv();
- if (message == null) {
- logger.error("ReceiveThread {}: received a null message", partitionId);
- break;
- }
-
- // process and store the message
- String msgString = new String(message.array());
- String[] msgTokens = msgString.split(" ");
- Integer senderPartition = Integer.parseInt(msgTokens[0]);
- Integer receivedMsg = Integer.parseInt(msgTokens[1]);
-
- if (!receivedMessages.containsKey(senderPartition)) {
- receivedMessages.put(senderPartition, new ArrayList<Integer>());
- }
-
- List<Integer> messagesList = receivedMessages.get(senderPartition);
-
- if (messagesList.contains(receivedMsg)) {
- messagesList.remove(receivedMsg);
- } else {
- messagesReceived++;
- }
- messagesList.add(receivedMsg);
- }
-
- logger.debug("Exiting");
- }
-
- public boolean orderedDelivery() {
- for (List<Integer> messagesList : receivedMessages.values()) {
- int lastMsg = -1;
- for (Integer msg : messagesList) {
- if (msg <= lastMsg) {
- return false;
- }
- }
- }
- return true;
- }
-
- public boolean messageDelivery() {
- if (messagesReceived < ptu.expectedMessages[partitionId]) {
- printCounts();
- return false;
- } else
- return true;
- }
-
- public void printCounts() {
- logger.debug("ReceiveThread {}: Messages not received = {}", partitionId,
- (ptu.expectedMessages[partitionId] - messagesReceived));
- int counts[] = new int[emitter.getPartitionCount()];
- for (Integer sender : receivedMessages.keySet()) {
- counts[sender] = receivedMessages.get(sender).size();
- }
-
- logger.debug("ReceiveThread {}: recvdCounts: {}", partitionId, counts);
- }
-
- public int moreMessages() {
- return (int) (ptu.expectedMessages[partitionId] - messagesReceived);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
deleted file mode 100644
index c647611..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.comm.util;
-
-import java.io.IOException;
-
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Injector;
-
-public abstract class ProtocolTestUtil extends ZkBasedTest {
- protected int[] expectedMessages;
- protected PartitionInfo[] partitions;
-
- protected ProtocolTestUtil() {
- super();
- }
-
- protected ProtocolTestUtil(int numTasks) {
- super(numTasks);
- }
-
- @Before
- public void preparePartitions() throws IOException, InterruptedException, KeeperException {
- expectedMessages = new int[super.numTasks];
- partitions = new PartitionInfo[super.numTasks];
- for (int i = 0; i < this.numTasks; i++) {
- partitions[i] = newInjector().getInstance(PartitionInfo.class);
- partitions[i].setProtocolTestUtil(this);
- }
- }
-
- protected abstract Injector newInjector() throws IOException;
-
- protected void decreaseExpectedMessages(int partition, long amount) {
- synchronized (expectedMessages) {
- expectedMessages[partition] -= amount;
- }
-
- if (partitions[partition].receiveThread.messagesReceived >= expectedMessages[partition])
- interrupt(partition);
- }
-
- protected void interrupt(int partition) {
- partitions[partition].receiveThread.interrupt();
- }
-
- protected void startThreads() {
- for (PartitionInfo partition : partitions) {
- partition.sendThread.start();
- partition.receiveThread.start();
- }
- }
-
- protected void waitForThreads() throws InterruptedException {
- for (PartitionInfo partition : partitions) {
- partition.sendThread.join();
- partition.receiveThread.join();
- }
- }
-
- protected boolean messageDelivery() {
- for (PartitionInfo partition : partitions) {
- if (!partition.receiveThread.messageDelivery())
- return false;
- }
- return true;
- }
-
- protected boolean messageOrdering() {
- for (PartitionInfo partition : partitions) {
- if (!partition.receiveThread.orderedDelivery())
- return false;
- }
- return true;
- }
-
- @After
- public void tearDown() {
- for (PartitionInfo partition : partitions) {
- // debug
- partition.receiveThread.printCounts();
- if (partition.emitter != null) {
- partition.emitter.close();
- partition.emitter = null;
- }
- if (partition.listener != null) {
- partition.listener.close();
- partition.listener = null;
- }
- }
- }
-
- @Test(timeout = 60000)
- public abstract void testDelivery() throws InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 7316498..5d3967f 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -63,6 +63,9 @@ public class CommTestUtils {
static {
logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
}
+ public final static String MESSAGE = "message@" + System.currentTimeMillis();
+
+ public final static CountDownLatch SIGNAL_MESSAGE_RECEIVED = new CountDownLatch(1);
protected static Process forkProcess(String mainClass, int debugPort, String... args) throws IOException,
InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
new file mode 100644
index 0000000..5dde4c2
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java
@@ -0,0 +1,38 @@
+package org.apache.s4.fixtures;
+
+import java.nio.ByteBuffer;
+
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.google.inject.Inject;
+
+/**
+ * For tests purposes, intercepts messages that would normally be delegated to the application layer.
+ *
+ */
+public class MockReceiver implements Receiver {
+
+ SerializerDeserializer serDeser;
+
+ @Inject
+ public MockReceiver(SerializerDeserializer serDeser) {
+ super();
+ this.serDeser = serDeser;
+ }
+
+ @Override
+ public void receive(ByteBuffer message) {
+ if (CommTestUtils.MESSAGE.equals(serDeser.deserialize(message))) {
+ CommTestUtils.SIGNAL_MESSAGE_RECEIVED.countDown();
+ } else {
+ System.err.println("Unexpected message:" + serDeser.deserialize(message));
+ }
+
+ }
+
+ @Override
+ public int getPartitionId() {
+ throw new RuntimeException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
new file mode 100644
index 0000000..2a5d0a8
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
@@ -0,0 +1,27 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+
+/**
+ * For tests purposes, intercepts messages that would normally be delegated to the application layer.
+ *
+ */
+public class MockReceiverModule extends AbstractModule {
+
+ @Provides
+ public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializerFactory serDeserFactory) {
+ // we use the current classloader here, no app class to serialize
+ return serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+ }
+
+ @Override
+ protected void configure() {
+ bind(Receiver.class).to(MockReceiver.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
new file mode 100644
index 0000000..01acdd1
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java
@@ -0,0 +1,22 @@
+package org.apache.s4.fixtures;
+
+import java.nio.ByteBuffer;
+
+import org.apache.s4.base.Receiver;
+
+/**
+ * Avoids delegating message processing to the application layer.
+ *
+ */
+class NoOpReceiver implements Receiver {
+
+ @Override
+ public void receive(ByteBuffer message) {
+ // do nothing
+ }
+
+ @Override
+ public int getPartitionId() {
+ throw new RuntimeException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
new file mode 100644
index 0000000..2ee3749
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
@@ -0,0 +1,27 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+
+/**
+ * Avoids delegating message processing to the application layer.
+ *
+ */
+public class NoOpReceiverModule extends AbstractModule {
+
+ @Provides
+ public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializerFactory serDeserFactory) {
+ // we use the current classloader here, no app class to serialize
+ return serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+ }
+
+ @Override
+ protected void configure() {
+ bind(Receiver.class).to(NoOpReceiver.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
index 7b525bd..f9707ec 100644
--- a/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
+++ b/subprojects/s4-comm/src/test/resources/udp.s4.comm.properties
@@ -5,3 +5,10 @@ s4.cluster.name = cluster1
s4.cluster.zk_address = localhost:2181
s4.cluster.zk_session_timeout = 10000
s4.cluster.zk_connection_timeout = 10000
+
+# how many threads to use for the sender stage (i.e. serialization)
+s4.sender.parallelism=1
+# maximum number of events in the buffer of the sender stage
+s4.sender.workQueueSize=10000
+# maximum number of events in the buffer of the processing stage
+s4.stream.workQueueSize=10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 7675e15..4a3693b 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -27,6 +27,7 @@ dependencies {
compile libraries.asm
compile libraries.netty
compile libraries.zkclient
+ compile libraries.reflectasm
testCompile project(path: ':s4-comm', configuration: 'tests')
testCompile libraries.gradle_base_services
testCompile libraries.gradle_core
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 43d137f..eb2ae1d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,10 +26,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.KeyFinder;
+import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
import org.apache.s4.core.window.AbstractSlidingWindowPE;
import org.apache.s4.core.window.SlotFactory;
import org.slf4j.Logger;
@@ -65,7 +67,7 @@ public abstract class App {
@Inject
private Sender sender;
@Inject
- private Receiver receiver;
+ private ReceiverImpl receiver;
@Inject
RemoteSenders remoteSenders;
@@ -90,6 +92,9 @@ public abstract class App {
private SerializerDeserializer serDeser;
@Inject
+ StreamExecutorServiceFactory streamExecutorFactory;
+
+ @Inject
private void initSerDeser() {
this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
}
@@ -272,7 +277,7 @@ public abstract class App {
/**
* @return the receiver object
*/
- public Receiver getReceiver() {
+ public ReceiverImpl getReceiver() {
return receiver;
}
@@ -284,6 +289,10 @@ public abstract class App {
return checkpointingFramework;
}
+ public StreamExecutorServiceFactory getStreamExecutorFactory() {
+ return streamExecutorFactory;
+ }
+
/**
* Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
* prototypes by key.
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 8874b93..5de792e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -25,10 +25,18 @@ import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.Sender;
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.ft.NoOpCheckpointingFramework;
+import org.apache.s4.core.staging.DefaultRemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
+import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
import org.apache.s4.core.util.S4Metrics;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.DistributedDeploymentManager;
@@ -72,6 +80,9 @@ public class DefaultCoreModule extends AbstractModule {
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
+ bind(Receiver.class).to(ReceiverImpl.class);
+ bind(Sender.class).to(SenderImpl.class);
+
bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
bind(S4RLoaderFactory.class);
@@ -81,6 +92,12 @@ public class DefaultCoreModule extends AbstractModule {
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
bind(S4Metrics.class);
+
+ bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+ bind(RemoteSendersExecutorServiceFactory.class).to(DefaultRemoteSendersExecutorServiceFactory.class);
+
+ bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
+
}
private void loadProperties(Binder binder) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index b1a0c10..295e416 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -55,7 +55,6 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
/**
* <p>
@@ -441,7 +440,7 @@ public abstract class ProcessingElement implements Cloneable {
protected void handleInputEvent(Event event) {
- TimerContext timerContext = processingTimer.time();
+ // TimerContext timerContext = processingTimer.time();
Object object;
if (isThreadSafe) {
object = new Object(); // a dummy object TODO improve this.
@@ -470,7 +469,7 @@ public abstract class ProcessingElement implements Cloneable {
checkpoint();
}
}
- timerContext.stop();
+ // timerContext.stop();
}
protected boolean isCheckpointable() {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
deleted file mode 100644
index 7a9b5f2..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.core.util.S4Metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.MapMaker;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-/**
- * The {@link Receiver} and its counterpart {@link Sender} are the top level classes of the communication layer.
- * <p>
- * {@link Receiver} is responsible for receiving an event to a {@link ProcessingElement} instance using a hashKey.
- * <p>
- * A Listener implementation receives data from the network and passes an event as a byte array to the {@link Receiver}.
- * The byte array is de-serialized and converted into an {@link Event}. Finally the event is passed to the matching
- * streams.
- * </p>
- * There is a single {@link Receiver} instance per node.
- *
- * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer. </p>
- */
-@Singleton
-public class Receiver implements Runnable {
-
- private static final Logger logger = LoggerFactory.getLogger(Receiver.class);
-
- final private Listener listener;
- final private SerializerDeserializer serDeser;
- private Map<Integer, Map<String, Stream<? extends Event>>> streams;
- private Thread thread;
-
- @Inject
- public Receiver(Listener listener, SerializerDeserializer serDeser) {
- this.listener = listener;
- this.serDeser = serDeser;
-
- thread = new Thread(this, "Receiver");
- // TODO avoid starting the thread here
- thread.start();
-
- streams = new MapMaker().makeMap();
- }
-
- public int getPartition() {
- return listener.getPartitionId();
- }
-
- /** Save stream keyed by app id and stream id. */
- void addStream(Stream<? extends Event> stream) {
- int appId = stream.getApp().getId();
- Map<String, Stream<? extends Event>> appMap = streams.get(appId);
- if (appMap == null) {
- appMap = new MapMaker().makeMap();
- streams.put(appId, appMap);
- }
- appMap.put(stream.getName(), stream);
- }
-
- /** Remove stream when it is no longer needed. */
- void removeStream(Stream<? extends Event> stream) {
- int appId = stream.getApp().getId();
- Map<String, Stream<? extends Event>> appMap = streams.get(appId);
- if (appMap == null) {
- logger.error("Tried to remove a stream that is not registered in the receiver.");
- return;
- }
- appMap.remove(stream.getName());
- }
-
- public void run() {
- // TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener
- // here?
- while (!Thread.interrupted()) {
- ByteBuffer message = listener.recv();
- S4Metrics.receivedEvent(message.array().length);
- Event event = (Event) serDeser.deserialize(message);
-
- String streamId = event.getStreamName();
-
- /*
- * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:
- * make this more efficient for the case in which we send the same event to multiple PEs.
- */
- try {
- streams.get(-1).get(streamId).receiveEvent(event);
- } catch (NullPointerException e) {
- logger.error("Could not find target stream for event with streamId={}", streamId);
- }
- }
- }
-
- public void close() {
- thread.interrupt();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
new file mode 100644
index 0000000..476f4ac
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.core.util.S4Metrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.MapMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link ReceiverImpl} and its counterpart {@link SenderImpl} are the top level classes of the communication layer.
+ * <p>
+ * {@link ReceiverImpl} is responsible for receiving an event to a {@link ProcessingElement} instance using a hashKey.
+ * <p>
+ * A Listener implementation receives data from the network and passes an event as a byte array to the
+ * {@link ReceiverImpl}. The byte array is de-serialized and converted into an {@link Event}. Finally the event is
+ * passed to the matching streams.
+ * </p>
+ * There is a single {@link ReceiverImpl} instance per node.
+ *
+ * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
+ * from the application developer. </p>
+ */
+@Singleton
+public class ReceiverImpl implements Receiver {
+
+ private static final Logger logger = LoggerFactory.getLogger(ReceiverImpl.class);
+
+ final private Listener listener;
+ final private SerializerDeserializer serDeser;
+ private Map<Integer, Map<String, Stream<? extends Event>>> streams;
+
+ @Inject
+ public ReceiverImpl(Listener listener, SerializerDeserializerFactory serDeserFactory) {
+ this.listener = listener;
+ this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+
+ streams = new MapMaker().makeMap();
+ }
+
+ @Override
+ public int getPartitionId() {
+ return listener.getPartitionId();
+ }
+
+ /** Save stream keyed by app id and stream id. */
+ void addStream(Stream<? extends Event> stream) {
+ int appId = stream.getApp().getId();
+ Map<String, Stream<? extends Event>> appMap = streams.get(appId);
+ if (appMap == null) {
+ appMap = new MapMaker().makeMap();
+ streams.put(appId, appMap);
+ }
+ appMap.put(stream.getName(), stream);
+ }
+
+ /** Remove stream when it is no longer needed. */
+ void removeStream(Stream<? extends Event> stream) {
+ int appId = stream.getApp().getId();
+ Map<String, Stream<? extends Event>> appMap = streams.get(appId);
+ if (appMap == null) {
+ logger.error("Tried to remove a stream that is not registered in the receiver.");
+ return;
+ }
+ appMap.remove(stream.getName());
+ }
+
+ @Override
+ public void receive(ByteBuffer message) {
+ S4Metrics.receivedEventFromCommLayer(message.array().length);
+ Event event = (Event) serDeser.deserialize(message);
+
+ String streamId = event.getStreamName();
+
+ /*
+ * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO: make
+ * this more efficient for the case in which we send the same event to multiple PEs.
+ */
+ try {
+ Map<String, Stream<? extends Event>> map = streams.get(-1);
+ map.get(streamId).receiveEvent(event);
+ } catch (NullPointerException e) {
+ logger.error("Could not find target stream for event with streamId={}", streamId);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 9c9e09c..b6a7d5b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -21,6 +21,7 @@ package org.apache.s4.core;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import org.apache.s4.base.Event;
import org.apache.s4.base.Hasher;
@@ -30,6 +31,7 @@ import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,25 +46,29 @@ public class RemoteSenders {
Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
- RemoteEmitters remoteEmitters;
+ final RemoteEmitters remoteEmitters;
- RemoteStreams remoteStreams;
+ final RemoteStreams remoteStreams;
- Clusters remoteClusters;
+ final Clusters remoteClusters;
- SerializerDeserializer serDeser;
+ final SerializerDeserializer serDeser;
- Hasher hasher;
+ final Hasher hasher;
ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
+ private ExecutorService executorService;
+
@Inject
public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
- SerializerDeserializerFactory serDeserFactory, Hasher hasher) {
+ SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+ RemoteSendersExecutorServiceFactory senderExecutorFactory) {
this.remoteEmitters = remoteEmitters;
this.remoteStreams = remoteStreams;
this.remoteClusters = remoteClusters;
this.hasher = hasher;
+ executorService = senderExecutorFactory.create();
serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
}
@@ -84,8 +90,26 @@ public class RemoteSenders {
sender = newSender;
}
}
- // we must set the app id of the consumer app for correct dispatch within the consumer node
// NOTE: this implies multiple serializations, there might be an optimization
+ executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
+ }
+ }
+
+ class SendToRemoteClusterTask implements Runnable {
+
+ String hashKey;
+ Event event;
+ RemoteSender sender;
+
+ public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender) {
+ super();
+ this.hashKey = hashKey;
+ this.event = event;
+ this.sender = sender;
+ }
+
+ @Override
+ public void run() {
sender.send(hashKey, serDeser.serialize(event));
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
deleted file mode 100644
index a308a15..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.nio.ByteBuffer;
-
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Event;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.core.util.S4Metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * The {@link Sender} and its counterpart {@link Receiver} are the top level classes of the communication layer.
- * <p>
- * {@link Sender} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
- * <p>
- * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
- * from the application developer.
- */
-public class Sender {
-
- private static Logger logger = LoggerFactory.getLogger(Sender.class);
-
- final private Emitter emitter;
- final private SerializerDeserializer serDeser;
- final private Hasher hasher;
-
- Assignment assignment;
- private int localPartitionId = -1;
-
- /**
- *
- * @param emitter
- * the emitter implements the low level communication layer.
- * @param serDeser
- * a serialization mechanism.
- * @param hasher
- * a hashing function to map keys to partition IDs.
- */
- @Inject
- public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
- this.emitter = emitter;
- this.serDeser = serDeser;
- this.hasher = hasher;
- this.assignment = assignment;
- }
-
- @Inject
- private void resolveLocalPartitionId() {
- ClusterNode node = assignment.assignClusterNode();
- if (node != null) {
- localPartitionId = node.getPartition();
- }
- }
-
- /**
- * This method attempts to send an event to a remote partition. If the destination is local, the method does not
- * send the event and returns false. <b>The caller is then expected to put the event in a local queue instead.</b>
- *
- * @param hashKey
- * the string used to map the value of a key to a specific partition.
- * @param event
- * the event to be delivered to a {@link ProcessingElement} instance.
- * @return true if the event was sent because the destination is <b>not</b> local.
- *
- */
- public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
- if (partition == localPartitionId) {
- /* Hey we are in the same JVM, don't use the network. */
- return false;
- }
- send(partition, serDeser.serialize(event));
- S4Metrics.sentEvent(partition);
- return true;
- }
-
- private void send(int partition, ByteBuffer message) {
-
- emitter.send(partition, message);
- }
-
- /**
- * Send an event to all the remote partitions in the cluster. The caller is expected to also put the event in a
- * local queue.
- *
- * @param event
- * the event to be delivered to {@link ProcessingElement} instances.
- */
- public void sendToRemotePartitions(Event event) {
-
- for (int i = 0; i < emitter.getPartitionCount(); i++) {
-
- /* Don't use the comm layer when we send to the same partition. */
- if (localPartitionId != i) {
- emitter.send(i, serDeser.serialize(event));
- S4Metrics.sentEvent(i);
-
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
new file mode 100644
index 0000000..255cdeb
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Sender;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.util.S4Metrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+/**
+ * The {@link SenderImpl} and its counterpart {@link ReceiverImpl} are the top level classes of the communication layer.
+ * <p>
+ * {@link SenderImpl} is responsible for sending an event to a {@link ProcessingElement} instance using a hashKey.
+ * <p>
+ * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
+ * from the application developer.
+ */
+public class SenderImpl implements Sender {
+
+ private static Logger logger = LoggerFactory.getLogger(SenderImpl.class);
+
+ final private Emitter emitter;
+ final private SerializerDeserializer serDeser;
+ final private Hasher hasher;
+
+ Assignment assignment;
+ private int localPartitionId = -1;
+
+ private ExecutorService tpe;
+
+ /**
+ *
+ * @param emitter
+ * the emitter implements the low level communication layer.
+ * @param serDeser
+ * a serialization mechanism.
+ * @param hasher
+ * a hashing function to map keys to partition IDs.
+ */
+ @Inject
+ public SenderImpl(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+ Assignment assignment, SenderExecutorServiceFactory senderExecutorServiceFactory) {
+ this.emitter = emitter;
+ this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+ this.hasher = hasher;
+ this.assignment = assignment;
+ this.tpe = senderExecutorServiceFactory.create();
+ }
+
+ @Inject
+ private void resolveLocalPartitionId() {
+ ClusterNode node = assignment.assignClusterNode();
+ if (node != null) {
+ localPartitionId = node.getPartition();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.Sender#checkAndSendIfNotLocal(java.lang.String, org.apache.s4.base.Event)
+ */
+ @Override
+ public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
+ int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
+ if (partition == localPartitionId) {
+ /* Hey we are in the same JVM, don't use the network. */
+ return false;
+ }
+ // TODO asynch
+ send(partition, serDeser.serialize(event));
+ S4Metrics.sentEvent(partition);
+ return true;
+ }
+
+ private void send(int partition, ByteBuffer message) {
+
+ emitter.send(partition, message);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.Sender#sendToRemotePartitions(org.apache.s4.base.Event)
+ */
+ @Override
+ public void sendToAllRemotePartitions(Event event) {
+ tpe.submit(new SerializeAndSendToAllRemotePartitionsTask(event));
+
+ }
+
+ class SerializeAndSendToRemotePartitionTask implements Runnable {
+ Event event;
+ int remotePartitionId;
+
+ public SerializeAndSendToRemotePartitionTask(Event event, int remotePartitionId) {
+ this.event = event;
+ this.remotePartitionId = remotePartitionId;
+ }
+
+ @Override
+ public void run() {
+ ByteBuffer serializedEvent = serDeser.serialize(event);
+ emitter.send(remotePartitionId, serializedEvent);
+
+ }
+
+ }
+
+ class SerializeAndSendToAllRemotePartitionsTask implements Runnable {
+
+ Event event;
+
+ public SerializeAndSendToAllRemotePartitionsTask(Event event) {
+ super();
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ ByteBuffer serializedEvent = serDeser.serialize(event);
+
+ for (int i = 0; i < emitter.getPartitionCount(); i++) {
+
+ /* Don't use the comm layer when we send to the same partition. */
+ if (localPartitionId != i) {
+ emitter.send(i, serializedEvent);
+ S4Metrics.sentEvent(i);
+
+ }
+ }
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 53ddc7a..94b19b5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -19,13 +19,14 @@
package org.apache.s4.core;
import java.util.Collection;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
import org.apache.s4.base.Event;
import org.apache.s4.base.GenericKeyFinder;
import org.apache.s4.base.Key;
import org.apache.s4.base.KeyFinder;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.core.util.S4Metrics;
@@ -42,25 +43,29 @@ import com.google.common.base.Preconditions;
* <p>
* To build an application, create stream objects using relevant methods in the {@link App} class.
*/
-public class Stream<T extends Event> implements Runnable, Streamable {
+public class Stream<T extends Event> implements Streamable {
private static final Logger logger = LoggerFactory.getLogger(Stream.class);
final static private String DEFAULT_SEPARATOR = "^";
- final static private int CAPACITY = 1000;
+ final static private int CAPACITY = 100000;
private static int idCounter = 0;
private String name;
protected Key<T> key;
private ProcessingElement[] targetPEs;
- protected final BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
- private Thread thread;
+ // protected final BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
+ // final BlockingQueue<StreamEventProcessingTask> taskQueue = new ArrayBlockingQueue<StreamEventProcessingTask>(
+ // CAPACITY);
+ private Executor eventProcessingExecutor;
final private Sender sender;
- final private Receiver receiver;
+ final private ReceiverImpl receiver;
// final private int id;
final private App app;
private Class<T> eventType = null;
SerializerDeserializer serDeser;
+ private int parallelism = 1;
+
/**
* Send events using a {@link KeyFinder}. The key finder extracts the value of the key which is used to determine
* the target {@link org.apache.s4.comm.topology.ClusterNode} for an event.
@@ -84,10 +89,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
}
}
- /* Start streaming. */
- thread = new Thread(this, name);
- thread.setContextClassLoader(getApp().getClass().getClassLoader());
- thread.start();
+ eventProcessingExecutor = app.getStreamExecutorFactory().create(parallelism, name,
+ app.getClass().getClassLoader());
+
this.receiver.addStream(this);
}
@@ -100,6 +104,14 @@ public class Stream<T extends Event> implements Runnable, Streamable {
*/
public Stream<T> setName(String name) {
this.name = name;
+ // Metrics.newGauge(getClass(), "stream-size-" + name, new Gauge<Integer>() {
+ //
+ // @Override
+ // public Integer value() {
+ // return taskQueue.size();
+ // }
+ // });
+
return this;
}
@@ -174,61 +186,55 @@ public class Stream<T extends Event> implements Runnable, Streamable {
*/
@SuppressWarnings("unchecked")
public void put(Event event) {
- try {
- event.setStreamId(getName());
- event.setAppId(app.getId());
+ event.setStreamId(getName());
+ event.setAppId(app.getId());
+
+ /*
+ * Events may be sent to local or remote partitions or both. The following code implements the logic.
+ */
+ if (key != null) {
/*
- * Events may be sent to local or remote partitions or both. The following code implements the logic.
+ * We send to a specific PE instance using the key but we don't know if the target partition is remote or
+ * local. We need to ask the sender.
*/
- if (key != null) {
+ if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
/*
- * We send to a specific PE instance using the key but we don't know if the target partition is remote
- * or local. We need to ask the sender.
+ * Sender checked and decided that the target is local so we simply put the event in the queue and we
+ * save the trip over the network.
*/
- if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
-
- /*
- * Sender checked and decided that the target is local so we simply put the event in the queue and
- * we save the trip over the network.
- */
- // TODO no need to serialize for local queue
- queue.put(event);
- }
+ eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
+ }
- } else {
+ } else {
- /*
- * We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every
- * node. The sender method takes care of the remote partitions an we take care of putting the event into
- * the queue.
- */
- sender.sendToRemotePartitions(event);
+ /*
+ * We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every node.
+ * The sender method takes care of the remote partitions an we take care of putting the event into the
+ * queue.
+ */
+ sender.sendToAllRemotePartitions(event);
- // TODO no need to serialize for local queue
- queue.put(event);
- // TODO abstraction around queue and add dropped counter
- // TODO add counter for local events
+ // now send to local queue
+ eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
+ // TODO abstraction around queue and add dropped counter
+ // TODO add counter for local events
- }
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- Thread.currentThread().interrupt();
}
}
/**
- * The low level {@link Receiver} object call this method when a new {@link Event} is available.
+ * The low level {@link ReceiverImpl} object call this method when a new {@link Event} is available.
*/
public void receiveEvent(Event event) {
- try {
- queue.put(event);
- // TODO abstraction around queue and add dropped counter
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- Thread.currentThread().interrupt();
- }
+ // NOTE: ArrayBlockingQueue.size is O(1).
+ // if (taskQueue.remainingCapacity() == 0) {
+ // S4Metrics.queueIsFull(name);
+ // }
+
+ eventProcessingExecutor.execute(new StreamEventProcessingTask((T) event));
+ // TODO abstraction around queue and add dropped counter
}
/**
@@ -263,7 +269,6 @@ public class Stream<T extends Event> implements Runnable, Streamable {
* Stop and close this stream.
*/
public void close() {
- thread.interrupt();
}
/**
@@ -280,59 +285,82 @@ public class Stream<T extends Event> implements Runnable, Streamable {
return receiver;
}
- @Override
- public void run() {
- while (true) {
- try {
- /* Get oldest event in queue. */
- T event = (T) queue.take();
- S4Metrics.dequeuedEvent(name);
+ public Stream<T> register() {
+ app.addStream(this);
+ return this;
+ }
+
+ public Stream<T> setSerializerDeserializerFactory(SerializerDeserializerFactory serDeserFactory) {
+ this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+ return this;
+ }
+
+ /**
+ * <p>
+ * Defines the maximum number of concurrent threads that should be used for processing events for this stream.
+ * Threads will only be created as necessary, up to the specified maximum.
+ * </p>
+ * <p>
+ * Default is 1 (i.e. with default stream executor service, this corresponds to asynchronous processing, but no
+ * parallelism)
+ * </p>
+ * <p>
+ * It might be useful to increase parallelism when:
+ * <ul>
+ * <li>Processing elements handling events for this stream are CPU bound</li>
+ * <li>Processing elements handling events for this stream use blocking I/O operations</li>
+ * </ul>
+ * <p>
+ *
+ *
+ */
+ public Stream<T> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
- /* Send event to each target PE. */
- for (int i = 0; i < targetPEs.length; i++) {
+ class StreamEventProcessingTask implements Runnable {
- if (key == null) {
+ T event;
- /* Broadcast to all PE instances! */
+ public StreamEventProcessingTask(T event) {
+ this.event = event;
+ }
- /* STEP 1: find all PE instances. */
+ @Override
+ public void run() {
+ S4Metrics.dequeuedEvent(name);
- Collection<ProcessingElement> pes = targetPEs[i].getInstances();
+ /* Send event to each target PE. */
+ for (int i = 0; i < targetPEs.length; i++) {
- /* STEP 2: iterate and pass event to PE instance. */
- for (ProcessingElement pe : pes) {
+ if (key == null) {
- pe.handleInputEvent(event);
- }
+ /* Broadcast to all PE instances! */
- } else {
+ /* STEP 1: find all PE instances. */
- /* We have a key, send to target PE. */
+ Collection<ProcessingElement> pes = targetPEs[i].getInstances();
- /* STEP 1: find the PE instance for key. */
- ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
+ /* STEP 2: iterate and pass event to PE instance. */
+ for (ProcessingElement pe : pes) {
- /* STEP 2: pass event to PE instance. */
pe.handleInputEvent(event);
}
- }
- } catch (InterruptedException e) {
- logger.info("Closing stream {}.", name);
- receiver.removeStream(this);
- Thread.currentThread().interrupt();
- return;
+ } else {
+
+ /* We have a key, send to target PE. */
+
+ /* STEP 1: find the PE instance for key. */
+ ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
+
+ /* STEP 2: pass event to PE instance. */
+ pe.handleInputEvent(event);
+ }
}
- }
- }
- public Stream<T> register() {
- app.addStream(this);
- return this;
- }
+ }
- public Stream<T> setSerializerDeserializerFactory(SerializerDeserializerFactory serDeserFactory) {
- this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
- return this;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
index ca23c79..3e2d617 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -22,12 +22,13 @@ import com.google.inject.AbstractModule;
/**
* Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
- *
+ *
*/
public class FileSystemBackendCheckpointingModule extends AbstractModule {
@Override
protected void configure() {
bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..b47087f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultRemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.staging;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Default implementation of the remote senders executor factory. It clones the implementation of the
+ * {@link DefaultSenderExecutorServiceFactory} class.
+ *
+ */
+public class DefaultRemoteSendersExecutorServiceFactory extends DefaultSenderExecutorServiceFactory implements
+ RemoteSendersExecutorServiceFactory {
+
+ @Inject
+ public DefaultRemoteSendersExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+ @Named("s4.sender.workQueueSize") int workQueueSize) {
+ super(threadPoolSize, workQueueSize);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
new file mode 100644
index 0000000..344bd59
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultSenderExecutorServiceFactory.java
@@ -0,0 +1,34 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.ThrottlingThreadPoolExecutorService;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Default factory implementation for the sender executor service. It uses a mechanism for throttling the submission of
+ * events and maintaining partial order.
+ *
+ */
+public class DefaultSenderExecutorServiceFactory implements SenderExecutorServiceFactory {
+
+ private int threadPoolSize;
+ private int workQueueSize;
+
+ @Inject
+ public DefaultSenderExecutorServiceFactory(@Named("s4.sender.parallelism") int threadPoolSize,
+ @Named("s4.sender.workQueueSize") int workQueueSize) {
+ this.threadPoolSize = threadPoolSize;
+ this.workQueueSize = workQueueSize;
+ }
+
+ @Override
+ public ExecutorService create() {
+ return new ThrottlingThreadPoolExecutorService(threadPoolSize, true,
+ (this instanceof DefaultRemoteSendersExecutorServiceFactory) ? "remote-sender-%d" : "sender-%d",
+ workQueueSize, getClass().getClassLoader());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
new file mode 100644
index 0000000..e0b52a9
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/DefaultStreamProcessingExecutorServiceFactory.java
@@ -0,0 +1,37 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.comm.ThrottlingThreadPoolExecutorService;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Default factory for the event processing stage executors.
+ * </p>
+ * <p>
+ * It provides optional parallelism, when the processing activity requires blocking I/O operations, or is CPU-bound.
+ * </p>
+ * <p>
+ * It throttles the submission of events while preserving partial ordering.
+ * </p>
+ *
+ */
+public class DefaultStreamProcessingExecutorServiceFactory implements StreamExecutorServiceFactory {
+
+ private int workQueueSize;
+
+ @Inject
+ public DefaultStreamProcessingExecutorServiceFactory(@Named("s4.stream.workQueueSize") int workQueueSize) {
+ this.workQueueSize = workQueueSize;
+ }
+
+ @Override
+ public ExecutorService create(int parallelism, String name, final ClassLoader classLoader) {
+ return new ThrottlingThreadPoolExecutorService(parallelism, true, "stream-" + name + "-%d", workQueueSize,
+ classLoader);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
new file mode 100644
index 0000000..a3fd4dc
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
@@ -0,0 +1,12 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Defines an executor factory for the stage responsible for sending events to remote logical clusters.
+ *
+ */
+public interface RemoteSendersExecutorServiceFactory {
+
+ ExecutorService create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
new file mode 100644
index 0000000..d7393f3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Defines a factory that creates executors for the stage responsible for the serialization of events and delegation to
+ * emitters in the communication layer.
+ *
+ */
+public interface SenderExecutorServiceFactory {
+
+ ExecutorService create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
new file mode 100644
index 0000000..db2df27
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
@@ -0,0 +1,29 @@
+package org.apache.s4.core.staging;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.core.App;
+
+/**
+ * Factory for creating an executor service that will process events in PEs. This is typically done asynchronously with
+ * a configurable thread pool.
+ * <p>
+ * Implementations may use dependency injection to set some default parameters.
+ * </p>
+ */
+public interface StreamExecutorServiceFactory {
+
+ /**
+ * Creates the executor service for a given stream.
+ *
+ * @param parallelism
+ * Number of concurrent threads
+ * @param name
+ * Name of the stream (for naming threads)
+ * @param classLoader
+ * Classloader used for specifying the context classloader in processing threads. This is usually the
+ * classloader that loaded the {@link App} class.
+ * @return Executor service for processing events in PEs
+ */
+ ExecutorService create(int parallelism, String name, ClassLoader classLoader);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 46abecf..27ac387 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -7,9 +7,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Emitter;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Receiver;
+import org.apache.s4.core.ReceiverImpl;
import org.apache.s4.core.RemoteSender;
-import org.apache.s4.core.Sender;
+import org.apache.s4.core.SenderImpl;
import org.apache.s4.core.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,15 +36,16 @@ public class S4Metrics {
static List<Meter> partitionSenderMeters = Lists.newArrayList();
- private static Meter eventMeter = Metrics.newMeter(Receiver.class, "received-events", "event-count",
+ private static Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
TimeUnit.SECONDS);
- private static Meter bytesMeter = Metrics.newMeter(Receiver.class, "received-bytes", "bytes-count",
+ private static Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
TimeUnit.SECONDS);
private static Meter[] senderMeters;
private static Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
private static Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
+ private static Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
private static Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
@@ -53,7 +54,7 @@ public class S4Metrics {
senderMeters = new Meter[emitter.getPartitionCount()];
// int localPartitionId = assignment.assignClusterNode().getPartition();
for (int i = 0; i < senderMeters.length; i++) {
- senderMeters[i] = Metrics.newMeter(Sender.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
+ senderMeters[i] = Metrics.newMeter(SenderImpl.class, "sender", "sent-to-" + (i), TimeUnit.SECONDS);
}
}
@@ -83,11 +84,16 @@ public class S4Metrics {
});
}
- public static void receivedEvent(int bytes) {
+ public static void receivedEventFromCommLayer(int bytes) {
eventMeter.mark();
bytesMeter.mark(bytes);
}
+ public static void queueIsFull(String name) {
+ streamQueueFullMeters.get(name).mark();
+
+ }
+
public static void sentEvent(int partition) {
try {
senderMeters[partition].mark();
@@ -103,7 +109,8 @@ public class S4Metrics {
dequeuingStreamMeters.put(name,
Metrics.newMeter(Stream.class, "dequeued@" + name, "dequeued", TimeUnit.SECONDS));
droppedStreamMeters.put(name, Metrics.newMeter(Stream.class, "dropped@" + name, "dropped", TimeUnit.SECONDS));
-
+ streamQueueFullMeters.put(name,
+ Metrics.newMeter(Stream.class, "stream-full@" + name, "stream-full", TimeUnit.SECONDS));
}
public static void dequeuedEvent(String name) {
@@ -153,4 +160,5 @@ public class S4Metrics {
fetchedCheckpointFailure.mark();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index e0bd874..377de0b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@ -166,6 +166,7 @@ public class CheckpointingTest {
bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
index b1676dc..6d358ce 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -29,7 +29,6 @@ import org.apache.s4.core.App;
import org.apache.s4.core.window.AbstractSlidingWindowPE;
import org.apache.s4.core.window.DefaultAggregatingSlot;
import org.apache.s4.core.window.SlotFactory;
-import org.apache.s4.core.windowing.WindowingPETest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 71bae7a..02fd694 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,7 +20,12 @@ package org.apache.s4.fixtures;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
-import org.apache.s4.core.Receiver;
+import org.apache.s4.base.Sender;
+import org.apache.s4.core.ReceiverImpl;
+import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
+import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
+import org.apache.s4.core.staging.SenderExecutorServiceFactory;
+import org.apache.s4.core.staging.StreamExecutorServiceFactory;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.NoOpDeploymentManager;
import org.mockito.Mockito;
@@ -28,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
/**
* Core module mocking basic platform functionalities.
@@ -46,6 +52,19 @@ public class MockCoreModule extends AbstractModule {
bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
bind(Listener.class).toInstance(Mockito.mock(Listener.class));
- bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+ bind(ReceiverImpl.class).toInstance(Mockito.mock(ReceiverImpl.class));
+ bind(Sender.class).toInstance(Mockito.mock(Sender.class));
+
+ // Although we want to mock as much as possible, most tests still require the machinery for routing events
+ // within a stream/node, therefore sender and stream executors are not mocked
+ bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
+
+ bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+
+ bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
+ bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);
+
+ bind(Integer.class).annotatedWith(Names.named("s4.stream.workQueueSize")).toInstance(10000);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f9689ea0/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index 0209933..1ea3a2a 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -21,9 +21,10 @@ package org.apache.s4.example.counter;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
+import org.apache.s4.base.Sender;
import org.apache.s4.core.App;
-import org.apache.s4.core.Receiver;
-import org.apache.s4.core.Sender;
+import org.apache.s4.core.ReceiverImpl;
+import org.apache.s4.core.SenderImpl;
import org.apache.s4.core.Stream;
import com.google.inject.Guice;
@@ -117,8 +118,8 @@ final public class MyApp extends App {
Injector injector = Guice.createInjector(new Module());
MyApp myApp = injector.getInstance(MyApp.class);
- Sender sender = injector.getInstance(Sender.class);
- Receiver receiver = injector.getInstance(Receiver.class);
+ Sender sender = injector.getInstance(SenderImpl.class);
+ ReceiverImpl receiver = injector.getInstance(ReceiverImpl.class);
// myApp.setCommLayer(sender, receiver);
myApp.init();
myApp.start();