You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/28 15:37:18 UTC
git commit: Fix timers synchronization
Updated Branches:
refs/heads/S4-63 [created] 2b212be69
Fix timers synchronization
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2b212be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2b212be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2b212be6
Branch: refs/heads/S4-63
Commit: 2b212be69b14ac6554648cd5fb3350c6867c76ba
Parents: 049a6b0
Author: Daniel Gómez Ferro <dg...@yahoo.es>
Authored: Fri Jun 22 13:03:22 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jun 28 17:32:16 2012 +0200
----------------------------------------------------------------------
build.gradle | 1 +
subprojects/s4-core/s4-core.gradle | 1 +
.../java/org/apache/s4/core/ProcessingElement.java | 2 +-
.../java/org/apache/s4/comm/BareCommModule.java | 40 +++++
.../java/org/apache/s4/core/BareCoreModule.java | 34 ++++
.../apache/s4/core/timers/MultithreadingTest.java | 118 +++++++++++++++
6 files changed, 195 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 54c70ee..d51799d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -57,6 +57,7 @@ project.ext["libraries"] = [
netty: 'org.jboss.netty:netty:3.2.5.Final',
reflectasm: 'com.esotericsoftware:reflectasm:0.8',
minlog: 'com.esotericsoftware:minlog:1.2',
+ mockito_core: 'org.mockito:mockito-core:1.9.0',
asm: 'asm:asm:3.2',
commons_io: 'commons-io:commons-io:2.0.1',
commons_config: 'commons-configuration:commons-configuration:1.6',
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index fffab81..ad3bbbe 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -26,6 +26,7 @@ dependencies {
testCompile libraries.gradle_core
testCompile libraries.gradle_tooling_api
testCompile libraries.gradle_wrapper
+ testCompile libraries.mockito_core
}
test {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 594043f..4dced75 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -636,7 +636,7 @@ public abstract class ProcessingElement implements Cloneable {
if (isThreadSafe) {
peInstance.onTime();
} else {
- synchronized (this) {
+ synchronized (peInstance) {
peInstance.onTime();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
new file mode 100644
index 0000000..8639e9a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
@@ -0,0 +1,40 @@
+package org.apache.s4.comm;
+
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.RemoteSenders;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+/**
+ * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ *
+ */
+public class BareCommModule extends AbstractModule {
+
+ public BareCommModule() {
+ super();
+ }
+
+ @Override
+ protected void configure() {
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
+ bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
+ bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
+ bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
+ bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
+ Names.bindProperties(binder(), ImmutableMap.of("cluster.name", "testCluster"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
new file mode 100644
index 0000000..70f88cd
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
@@ -0,0 +1,34 @@
+package org.apache.s4.core;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.NoOpDeploymentManager;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ *
+ */
+public class BareCoreModule extends AbstractModule {
+
+ @SuppressWarnings("unused")
+ private static Logger logger = LoggerFactory.getLogger(BareCoreModule.class);
+
+ public BareCoreModule() {
+ }
+
+ @Override
+ protected void configure() {
+ bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
+ bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
+ bind(Listener.class).toInstance(Mockito.mock(Listener.class));
+ bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
new file mode 100644
index 0000000..3bbd38e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -0,0 +1,118 @@
+package org.apache.s4.core.timers;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.BareCommModule;
+import org.apache.s4.core.App;
+import org.apache.s4.core.BareCoreModule;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class MultithreadingTest {
+ private static Logger logger = LoggerFactory.getLogger(MultithreadingTest.class);
+
+ private static final String STREAM_NAME = "StreamName";
+ private static final String APP_NAME = "AppName";
+
+ /*
+ * We inject one event and fire one onTime() event, both should be synchronized (not running in parallel)
+ */
+ @Test
+ public void testSynchronization() throws IOException, InterruptedException {
+ Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
+ TestApp app = injector.getInstance(TestApp.class);
+ app.count = 2; // One for the event, another for the timer
+ app.init();
+ app.start();
+
+ app.testStream.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(new Event())));
+
+ /*
+ * This must raise a timeout, since the onTime() event is blocked waiting for the onEvent() call to finish. If
+ * it completes before the timeout, it means onEvent() and onTime() weren't synchronized
+ */
+ assertFalse("The timer wasn't synchronized with the event", app.latch.await(2, TimeUnit.SECONDS));
+ }
+
+ public static class CountdownPE extends ProcessingElement {
+ CountDownLatch latch;
+
+ public CountdownPE(App app) {
+ super(app);
+ }
+
+ public void onEvent(Event ev) {
+ logger.debug("Event processed");
+ latch.countDown();
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+
+ @Override
+ protected void onTime() {
+ logger.debug("Timer fired");
+ latch.countDown();
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // ignore
+ }
+ }
+
+ @Override
+ protected void onCreate() {
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+ }
+
+ private static class TestApp extends App {
+ Stream<Event> testStream;
+ CountDownLatch latch;
+ int count;
+
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ latch = new CountDownLatch(count);
+
+ CountdownPE countdownPE = createPE(CountdownPE.class);
+ countdownPE.latch = latch;
+ testStream = createStream(STREAM_NAME, new KeyFinder<Event>() {
+ @Override
+ public List<String> get(Event event) {
+ return ImmutableList.of(Integer.toString(event.hashCode()));
+ }
+ }, countdownPE);
+ countdownPE.setTimerInterval(1, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+ }
+}