You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/24 16:19:48 UTC

[1/4] git commit: Merge branch 'S4-57' into piper

Updated Branches:
  refs/heads/piper 43a31f440 -> 704404f6a


Merge branch 'S4-57' into piper


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/704404f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/704404f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/704404f6

Branch: refs/heads/piper
Commit: 704404f6a0850711ae1a0f1b687ae48e91238e69
Parents: 43a31f4 995d358
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Jul 24 17:55:05 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jul 24 18:16:26 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/core/App.java      |   12 +-
 .../java/org/apache/s4/core/ProcessingElement.java |   59 +++--
 .../main/java/org/apache/s4/core/WindowingPE.java  |  163 -----------
 .../s4/core/window/AbstractSlidingWindowPE.java    |  219 +++++++++++++++
 .../s4/core/window/DefaultAggregatingSlot.java     |   51 ++++
 .../java/org/apache/s4/core/window/OHCLSlot.java   |   61 ++++
 .../main/java/org/apache/s4/core/window/Slot.java  |   23 ++
 .../org/apache/s4/core/window/SlotFactory.java     |   13 +
 .../java/org/apache/s4/comm/BareCommModule.java    |   45 ---
 .../java/org/apache/s4/core/BareCoreModule.java    |   33 ---
 .../test/java/org/apache/s4/core/TriggerTest.java  |    5 +-
 .../org/apache/s4/core/ft/CheckpointingTest.java   |    8 +-
 .../apache/s4/core/timers/MultithreadingTest.java  |    6 +-
 .../org/apache/s4/core/windowing/WindowingPE1.java |   67 +++++
 .../apache/s4/core/windowing/WindowingPETest.java  |   99 +++++++
 .../org/apache/s4/fixtures/MockCommModule.java     |   47 +++
 .../org/apache/s4/fixtures/MockCoreModule.java     |   33 +++
 17 files changed, 670 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/704404f6/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index bc20e98,0d958fd..2e816e6
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@@ -26,7 -26,9 +26,9 @@@ import org.apache.s4.base.KeyFinder
  import org.apache.s4.base.SerializerDeserializer;
  import org.apache.s4.comm.serialize.KryoSerDeser;
  import org.apache.s4.comm.topology.RemoteStreams;
 -import org.apache.s4.core.App.ClockType;
 +import org.apache.s4.core.ft.CheckpointingFramework;
+ import org.apache.s4.core.window.AbstractSlidingWindowPE;
+ import org.apache.s4.core.window.SlotFactory;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -34,10 -36,8 +36,10 @@@ import com.google.common.collect.Maps
  import com.google.inject.Inject;
  import com.google.inject.name.Named;
  
 -/*
 - * Container base class to hold all processing elements. We will implement administrative methods here.
 +/**
 + * Container base class to hold all processing elements.
-  *
++ * 
 + * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
   */
  public abstract class App {
  

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/704404f6/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 730ff56,0f14899..6f8264a
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@@ -1,12 -1,28 +1,15 @@@
 -/*
 - * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
 - *
 - * Licensed under the Apache License, Version 2.0 (the "License");
 - * you may not use this file except in compliance with the License.
 - * You may obtain a copy of the License at
 - *          http://www.apache.org/licenses/LICENSE-2.0
 - *
 - * Unless required by applicable law or agreed to in writing,
 - * software distributed under the License is distributed on an
 - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 - * either express or implied. See the License for the specific
 - * language governing permissions and limitations under the
 - * License. See accompanying LICENSE file.
 - */
  package org.apache.s4.core;
  
+ import java.lang.Thread.UncaughtExceptionHandler;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Modifier;
  import java.util.Collection;
  import java.util.Map;
- import java.util.Timer;
  import java.util.TimerTask;
  import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadFactory;
  import java.util.concurrent.TimeUnit;
  
  import net.jcip.annotations.ThreadSafe;
@@@ -95,32 -107,25 +99,32 @@@ public abstract class ProcessingElemen
       * This maps holds all the instances. We make it package private to prevent concrete classes from updating the
       * collection.
       */
 -    Cache<String, ProcessingElement> peInstances;
 +    transient Cache<String, ProcessingElement> peInstances;
  
      /* This map is initialized in the prototype and cloned to instances. */
 -    Map<Class<? extends Event>, Trigger> triggers;
 +    transient Map<Class<? extends Event>, Trigger> triggers;
  
      /* PE instance id. */
-     String id = "";
+     protected String id = "";
  
      /* Private fields. */
 -    private ProcessingElement pePrototype;
 -    private boolean haveTriggers = false;
 -    private long timerIntervalInMilliseconds = 0;
 -    private ScheduledExecutorService timer;
 -    private boolean isPrototype = true;
 -    private boolean isThreadSafe = false;
 -    private String name = null;
 -    private boolean isSingleton = false;
 -
 -    private transient OverloadDispatcher overloadDispatcher;
 +    transient private ProcessingElement pePrototype;
 +    transient private boolean haveTriggers = false;
 +    transient private long timerIntervalInMilliseconds = 0;
-     transient private Timer triggerTimer;
-     transient private Timer checkpointingTimer;
++    transient private ScheduledExecutorService triggerTimer;
++    transient private ScheduledExecutorService checkpointingTimer;
 +    transient private boolean isPrototype = true;
 +    transient private boolean isThreadSafe = false;
 +    transient private String name = null;
 +    transient private boolean isSingleton = false;
 +    transient long eventCount = 0;
 +
 +    transient private OverloadDispatcher overloadDispatcher;
 +    transient private boolean recoveryAttempted = false;
 +    transient private boolean dirty = false;
 +
 +    transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
 +            .build();
  
      protected ProcessingElement() {
          OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
@@@ -374,14 -378,22 +378,23 @@@
  
          Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
  
 -        if (timer != null) {
 -            timer.shutdownNow();
 +        if (triggerTimer != null) {
-             triggerTimer.cancel();
++            triggerTimer.shutdownNow();
          }
  
--        if (interval == 0)
++        if (interval == 0) {
              return this;
++        }
+ 
+         ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                 .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
  
-         triggerTimer = new Timer();
+                     @Override
+                     public void uncaughtException(Thread t, Throwable e) {
+                         logger.error("Expection from timer thread", e);
+                     }
+                 }).setNameFormat("Timer-" + getClass().getSimpleName()).build();
 -        timer = Executors.newSingleThreadScheduledExecutor(threadFactory);
++        triggerTimer = Executors.newSingleThreadScheduledExecutor(threadFactory);
          return this;
      }
  
@@@ -486,9 -479,9 +499,9 @@@
      protected void removeAll() {
  
          /* Close resources in prototype. */
 -        if (timer != null) {
 -            timer.shutdownNow();
 -            logger.info("Timer stopped.");
 +        if (triggerTimer != null) {
-             triggerTimer.cancel();
-             logger.info("Timer stopped.");
++            triggerTimer.shutdownNow();
++            logger.info("Trigger timer stopped.");
          }
  
          /* Remove all the instances. */
@@@ -523,22 -516,12 +536,30 @@@
          }
  
          /* Start timer. */
 -        if (timer != null) {
 -            timer.scheduleAtFixedRate(new OnTimeTask(), 0, timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
 +        if (triggerTimer != null) {
-             triggerTimer
-                     .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
-             logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
++            triggerTimer.scheduleAtFixedRate(new OnTimeTask(), 0, timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
+             logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
                      this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
          }
  
 +        if (checkpointingConfig.mode == CheckpointingMode.TIME) {
-             checkpointingTimer = new Timer();
-             checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
-                     checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
-                     checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
-             logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
-                     new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
++            ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
++                    .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
++
++                        @Override
++                        public void uncaughtException(Thread t, Throwable e) {
++                            logger.error("Expection from checkpointing thread", e);
++                        }
++                    }).setNameFormat("Checkpointing-trigger-" + getClass().getSimpleName()).build();
++            checkpointingTimer = Executors.newSingleThreadScheduledExecutor(threadFactory);
++            checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this), checkpointingConfig.frequency,
++                    checkpointingConfig.frequency, checkpointingConfig.timeUnit);
++            logger.debug(
++                    "Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}] [{}].",
++                    new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency),
++                            String.valueOf(checkpointingConfig.timeUnit.toString()) });
 +        }
 +
          /* Check if this PE is annotated as thread safe. */
          if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {
  

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/704404f6/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 70fe4ec,12f5d2f..b3ce9ed
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@@ -6,12 -6,10 +6,13 @@@ import java.util.concurrent.TimeUnit
  
  import junit.framework.Assert;
  
 -import org.apache.s4.comm.DefaultCommModule;
 +import org.apache.s4.base.EventMessage;
- import org.apache.s4.comm.BareCommModule;
  import org.apache.s4.core.triggers.TriggeredApp;
  import org.apache.s4.fixtures.CommTestUtils;
++import org.apache.s4.fixtures.MockCommModule;
++import org.apache.s4.fixtures.MockCoreModule;
  import org.apache.s4.fixtures.ZkBasedTest;
 +import org.apache.s4.wordcount.StringEvent;
  import org.apache.zookeeper.KeeperException;
  import org.apache.zookeeper.ZooKeeper;
  import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@@ -46,7 -45,9 +47,7 @@@ public abstract class TriggerTest exten
  
      protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
          final ZooKeeper zk = CommTestUtils.createZkClient();
-         Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
 -        Injector injector = Guice.createInjector(
 -                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
 -                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
++        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
          app = injector.getInstance(TriggeredApp.class);
          app.init();
          app.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/704404f6/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
index ef3fabd,0000000..a9649e5
mode 100644,000000..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
@@@ -1,153 -1,0 +1,153 @@@
 +package org.apache.s4.core.ft;
 +
 +import java.io.File;
 +import java.lang.reflect.Field;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +
 +import junit.framework.Assert;
 +
 +import org.apache.commons.codec.binary.Base64;
 +import org.apache.s4.base.Event;
 +import org.apache.s4.base.EventMessage;
 +import org.apache.s4.base.KeyFinder;
- import org.apache.s4.comm.BareCommModule;
 +import org.apache.s4.core.App;
- import org.apache.s4.core.BareCoreModule;
 +import org.apache.s4.core.ProcessingElement;
 +import org.apache.s4.core.Stream;
 +import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
 +import org.apache.s4.fixtures.CoreTestUtils;
++import org.apache.s4.fixtures.MockCommModule;
++import org.apache.s4.fixtures.MockCoreModule;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.io.Files;
 +import com.google.inject.Guice;
 +import com.google.inject.Injector;
 +
 +public class CheckpointingTest {
 +
 +    private static Factory zookeeperServerConnectionFactory = null;
 +    public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("user.dir") + File.separator + "tmp");
 +    public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
 +            + "storage");
 +
 +    @Before
 +    public void prepare() throws Exception {
 +        zookeeperServerConnectionFactory = CoreTestUtils.startZookeeperServer();
 +    }
 +
 +    @After
 +    public void cleanup() throws Exception {
 +        CoreTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
 +    }
 +
 +    @Test
 +    public void testCheckpointStorage() throws Exception {
 +        final ZooKeeper zk = CoreTestUtils.createZkClient();
 +
 +        // 2. generate a simple event that creates and changes the state of
 +        // the
 +        // PE
 +
 +        // NOTE: coordinate through zookeeper
 +        final CountDownLatch signalValue1Set = new CountDownLatch(1);
 +
 +        CoreTestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
 +        final CountDownLatch signalCheckpointed = new CountDownLatch(1);
 +        CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
 +
-         Injector injector = Guice.createInjector(new BareCommModule(),
++        Injector injector = Guice.createInjector(new MockCommModule(),
 +                new MockCoreModuleWithFileBaseCheckpointingBackend());
 +        TestApp app = injector.getInstance(TestApp.class);
 +        app.init();
 +        app.start();
 +
 +        Event event = new Event();
 +        event.put("command", String.class, "setValue1");
 +        event.put("value", String.class, "message1");
 +
 +        app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
 +
 +        signalValue1Set.await();
 +
 +        StatefulTestPE pe = (StatefulTestPE) app.getPE("statefulPE1").getInstanceForKey("X");
 +
 +        Assert.assertEquals("message1", pe.getValue1());
 +        Assert.assertEquals("", pe.getValue2());
 +
 +        // 3. generate a checkpoint event
 +        event = new Event();
 +        event.put("command", String.class, "checkpoint");
 +        app.testStream.receiveEvent(new EventMessage("", "stream1", app.getSerDeser().serialize(event)));
 +        Assert.assertTrue(signalCheckpointed.await(10, TimeUnit.SECONDS));
 +
 +        // NOTE: the backend has asynchronous save operations
 +        Thread.sleep(1000);
 +
 +        CheckpointId safeKeeperId = new CheckpointId(pe);
 +        File expected = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator + "storage"
 +                + File.separator + safeKeeperId.getPrototypeId() + File.separator
 +                + Base64.encodeBase64URLSafeString(safeKeeperId.getStringRepresentation().getBytes()));
 +
 +        // 4. verify that state was correctly persisted
 +        Assert.assertTrue(expected.exists());
 +
 +        StatefulTestPE refPE = new StatefulTestPE();
 +        refPE.onCreate();
 +        refPE.setValue1("message1");
 +
 +        Field idField = ProcessingElement.class.getDeclaredField("id");
 +        idField.setAccessible(true);
 +        idField.set(refPE, "X");
 +
 +        byte[] refBytes = app.getSerDeser().serialize(refPE);
 +
 +        Assert.assertTrue(Arrays.equals(refBytes, Files.toByteArray(expected)));
 +
 +    }
 +
 +    private static class TestApp extends App {
 +        Stream<Event> testStream;
 +        int count;
 +
 +        @Override
 +        protected void onStart() {
 +        }
 +
 +        @Override
 +        protected void onInit() {
 +
 +            StatefulTestPE pe = createPE(StatefulTestPE.class, "statefulPE1");
 +            testStream = createStream("stream1", new KeyFinder<Event>() {
 +                @Override
 +                public List<String> get(Event event) {
 +                    return ImmutableList.of("X");
 +                }
 +            }, pe);
 +        }
 +
 +        @Override
 +        protected void onClose() {
 +        }
 +
 +    }
 +
-     private static class MockCoreModuleWithFileBaseCheckpointingBackend extends BareCoreModule {
++    private static class MockCoreModuleWithFileBaseCheckpointingBackend extends MockCoreModule {
 +
 +        protected void configure() {
 +            super.configure();
 +            bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
 +            bind(CheckpointingFramework.class).to(SafeKeeper.class);
 +            bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
 +        }
 +
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/704404f6/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 0000000,13e8672..a9a72ce
mode 000000,100644..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@@ -1,0 -1,42 +1,47 @@@
+ package org.apache.s4.fixtures;
+ 
+ import org.apache.s4.base.Hasher;
+ import org.apache.s4.base.SerializerDeserializer;
+ import org.apache.s4.comm.DefaultHasher;
+ import org.apache.s4.comm.RemoteEmitterFactory;
+ import org.apache.s4.comm.serialize.KryoSerDeser;
+ import org.apache.s4.comm.tcp.RemoteEmitters;
++import org.apache.s4.comm.topology.Assignment;
++import org.apache.s4.comm.topology.ClusterNode;
+ import org.apache.s4.comm.topology.Clusters;
+ import org.apache.s4.comm.topology.RemoteStreams;
+ import org.apache.s4.core.RemoteSenders;
+ import org.mockito.Mockito;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import com.google.inject.AbstractModule;
+ import com.google.inject.name.Names;
+ 
+ /**
+  * Mock module for the comm layer. Mocks comm layer basic functionalities, and uses some default when required.
+  *
+  */
+ public class MockCommModule extends AbstractModule {
+ 
+     public MockCommModule() {
+         super();
+     }
+ 
+     @Override
+     protected void configure() {
+         /* The hashing function to map keys top partitions. */
+         bind(Hasher.class).to(DefaultHasher.class);
+         /* Use Kryo to serialize events. */
+         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+         bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
+         bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
+         bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
+         bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
+         bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
++        Assignment mockedAssignment = Mockito.mock(Assignment.class);
++        Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
++        bind(Assignment.class).toInstance(mockedAssignment);
+         Names.bindProperties(binder(), ImmutableMap.of("cluster.name", "testCluster"));
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/704404f6/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 0000000,a7f8375..7c52c94
mode 000000,100644..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@@ -1,0 -1,33 +1,33 @@@
+ package org.apache.s4.fixtures;
+ 
+ import org.apache.s4.base.Emitter;
+ import org.apache.s4.base.Listener;
+ import org.apache.s4.core.Receiver;
+ import org.apache.s4.deploy.DeploymentManager;
+ import org.apache.s4.deploy.NoOpDeploymentManager;
+ import org.mockito.Mockito;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.inject.AbstractModule;
+ 
+ /**
+  * Core module mocking basic platform functionalities.
 - *
++ * 
+  */
+ public class MockCoreModule extends AbstractModule {
+ 
+     @SuppressWarnings("unused")
+     private static Logger logger = LoggerFactory.getLogger(MockCoreModule.class);
+ 
+     public MockCoreModule() {
+     }
+ 
+     @Override
+     protected void configure() {
+         bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
+         bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
+         bind(Listener.class).toInstance(Mockito.mock(Listener.class));
+         bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+     }
+ }