You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/28 15:37:18 UTC

git commit: Fix timers synchronization

Updated Branches:
  refs/heads/S4-63 [created] 2b212be69


Fix timers synchronization


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/2b212be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/2b212be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/2b212be6

Branch: refs/heads/S4-63
Commit: 2b212be69b14ac6554648cd5fb3350c6867c76ba
Parents: 049a6b0
Author: Daniel Gómez Ferro <dg...@yahoo.es>
Authored: Fri Jun 22 13:03:22 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jun 28 17:32:16 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |    1 +
 subprojects/s4-core/s4-core.gradle                 |    1 +
 .../java/org/apache/s4/core/ProcessingElement.java |    2 +-
 .../java/org/apache/s4/comm/BareCommModule.java    |   40 +++++
 .../java/org/apache/s4/core/BareCoreModule.java    |   34 ++++
 .../apache/s4/core/timers/MultithreadingTest.java  |  118 +++++++++++++++
 6 files changed, 195 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 54c70ee..d51799d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -57,6 +57,7 @@ project.ext["libraries"] = [
     netty:              'org.jboss.netty:netty:3.2.5.Final',
     reflectasm:         'com.esotericsoftware:reflectasm:0.8',
     minlog:             'com.esotericsoftware:minlog:1.2',
+    mockito_core:       'org.mockito:mockito-core:1.9.0',
     asm:                'asm:asm:3.2',
     commons_io:         'commons-io:commons-io:2.0.1',
     commons_config:     'commons-configuration:commons-configuration:1.6',

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index fffab81..ad3bbbe 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -26,6 +26,7 @@ dependencies {
     testCompile libraries.gradle_core
     testCompile libraries.gradle_tooling_api
     testCompile libraries.gradle_wrapper
+    testCompile libraries.mockito_core
 }
 
 test {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 594043f..4dced75 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -636,7 +636,7 @@ public abstract class ProcessingElement implements Cloneable {
                     if (isThreadSafe) {
                         peInstance.onTime();
                     } else {
-                        synchronized (this) {
+                        synchronized (peInstance) {
                             peInstance.onTime();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
new file mode 100644
index 0000000..8639e9a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
@@ -0,0 +1,40 @@
+package org.apache.s4.comm;
+
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.RemoteSenders;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+/**
+ * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ * 
+ */
+public class BareCommModule extends AbstractModule {
+
+    public BareCommModule() {
+        super();
+    }
+
+    @Override
+    protected void configure() {
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
+        bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
+        bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
+        bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
+        bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
+        Names.bindProperties(binder(), ImmutableMap.of("cluster.name", "testCluster"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
new file mode 100644
index 0000000..70f88cd
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
@@ -0,0 +1,34 @@
+package org.apache.s4.core;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.NoOpDeploymentManager;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
+ * until we have a better way to customize node configuration
+ * 
+ */
+public class BareCoreModule extends AbstractModule {
+
+    @SuppressWarnings("unused")
+    private static Logger logger = LoggerFactory.getLogger(BareCoreModule.class);
+
+    public BareCoreModule() {
+    }
+
+    @Override
+    protected void configure() {
+        bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
+        bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
+        bind(Listener.class).toInstance(Mockito.mock(Listener.class));
+        bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2b212be6/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
new file mode 100644
index 0000000..3bbd38e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
@@ -0,0 +1,118 @@
+package org.apache.s4.core.timers;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.comm.BareCommModule;
+import org.apache.s4.core.App;
+import org.apache.s4.core.BareCoreModule;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class MultithreadingTest {
+    private static Logger logger = LoggerFactory.getLogger(MultithreadingTest.class);
+
+    private static final String STREAM_NAME = "StreamName";
+    private static final String APP_NAME = "AppName";
+
+    /*
+     * We inject one event and fire one onTime() event, both should be synchronized (not running in parallel)
+     */
+    @Test
+    public void testSynchronization() throws IOException, InterruptedException {
+        Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
+        TestApp app = injector.getInstance(TestApp.class);
+        app.count = 2; // One for the event, another for the timer
+        app.init();
+        app.start();
+
+        app.testStream.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(new Event())));
+
+        /*
+         * This must raise a timeout, since the onTime() event is blocked waiting for the onEvent() call to finish. If
+         * it completes before the timeout, it means onEvent() and onTime() weren't synchronized
+         */
+        assertFalse("The timer wasn't synchronized with the event", app.latch.await(2, TimeUnit.SECONDS));
+    }
+
+    public static class CountdownPE extends ProcessingElement {
+        CountDownLatch latch;
+
+        public CountdownPE(App app) {
+            super(app);
+        }
+
+        public void onEvent(Event ev) {
+            logger.debug("Event processed");
+            latch.countDown();
+            try {
+                latch.await(10, TimeUnit.SECONDS);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+
+        @Override
+        protected void onTime() {
+            logger.debug("Timer fired");
+            latch.countDown();
+            try {
+                latch.await(10, TimeUnit.SECONDS);
+            } catch (InterruptedException ie) {
+                // ignore
+            }
+        }
+
+        @Override
+        protected void onCreate() {
+        }
+
+        @Override
+        protected void onRemove() {
+        }
+    }
+
+    private static class TestApp extends App {
+        Stream<Event> testStream;
+        CountDownLatch latch;
+        int count;
+
+        @Override
+        protected void onStart() {
+        }
+
+        @Override
+        protected void onInit() {
+            latch = new CountDownLatch(count);
+
+            CountdownPE countdownPE = createPE(CountdownPE.class);
+            countdownPE.latch = latch;
+            testStream = createStream(STREAM_NAME, new KeyFinder<Event>() {
+                @Override
+                public List<String> get(Event event) {
+                    return ImmutableList.of(Integer.toString(event.hashCode()));
+                }
+            }, countdownPE);
+            countdownPE.setTimerInterval(1, TimeUnit.SECONDS);
+        }
+
+        @Override
+        protected void onClose() {
+        }
+
+    }
+}