You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/04 09:39:58 UTC

[1/2] git commit: Improvements proposal for the windowing mechanism - added utility classes for slots - added regression test for time based windows

Updated Branches:
  refs/heads/S4-57 [created] 510875add


Improvements proposal for the windowing mechanism
- added utility classes for slots
- added regression test for time based windows


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/510875ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/510875ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/510875ad

Branch: refs/heads/S4-57
Commit: 510875add704ef641464bfd8f12d749811c278e8
Parents: adad748
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Jul 1 17:44:39 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sun Jul 1 17:44:39 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/core/App.java      |    2 +-
 .../java/org/apache/s4/core/ProcessingElement.java |    6 +-
 .../main/java/org/apache/s4/core/WindowingPE.java  |  163 -------------
 .../s4/core/window/DefaultAggregatingSlot.java     |   31 +++
 .../main/java/org/apache/s4/core/window/Slot.java  |   23 ++
 .../org/apache/s4/core/window/WindowingPE.java     |  184 +++++++++++++++
 .../org/apache/s4/core/windowing/WindowingPE1.java |   65 +++++
 .../apache/s4/core/windowing/WindowingPETest.java  |   97 ++++++++
 8 files changed, 404 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index fe4ceed..06fd9a4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,7 @@ import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.window.WindowingPE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 4dced75..d4e26e9 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -109,7 +109,7 @@ public abstract class ProcessingElement implements Cloneable {
     Map<Class<? extends Event>, Trigger> triggers;
 
     /* PE instance id. */
-    String id = "";
+    protected String id = "";
 
     /* Private fields. */
     private ProcessingElement pePrototype;
@@ -211,7 +211,7 @@ public abstract class ProcessingElement implements Cloneable {
         return peInstances.size();
     }
 
-    Map<String, ProcessingElement> getPEInstances() {
+    protected Map<String, ProcessingElement> getPEInstances() {
         return peInstances.asMap();
     }
 
@@ -490,7 +490,7 @@ public abstract class ProcessingElement implements Cloneable {
     }
 
     /* This method is called by App just before the application starts. */
-    void initPEPrototypeInternal() {
+    protected void initPEPrototypeInternal() {
 
         /* Eagerly create singleton PE. */
         if (isSingleton) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
deleted file mode 100644
index 0b6c0af..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (c) 2011 The S4 Project, http://s4.io.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.core;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
- * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
- * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
- * 
- * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
- * efficient for sparse event streams.
- */
-public abstract class WindowingPE<T> extends ProcessingElement {
-
-    private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
-
-    final private int numSlots;
-    private CircularFifoBuffer<T> circularBuffer;
-    final private Timer timer;
-    final private long slotDurationInMilliseconds;
-
-    /**
-     * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
-     * 
-     * @param app
-     *            the application
-     * @param slotDuration
-     *            the slot duration in timeUnit
-     * @param timeUnit
-     *            the unit of time
-     * @param numSlots
-     *            the number of slots to be stored
-     */
-    public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
-        super(app);
-        this.numSlots = numSlots;
-
-        if (slotDuration > 0l) {
-            slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
-            timer = new Timer();
-            timer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
-            logger.trace("TIMER: " + slotDurationInMilliseconds);
-
-        } else {
-            slotDurationInMilliseconds = 0;
-            timer = null;
-        }
-    }
-
-    /**
-     * 
-     * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
-     * concrete class.
-     * 
-     * @param app
-     *            the application
-     * @param numSlots
-     *            the number of slots to be stored
-     */
-    public WindowingPE(App app, int numSlots) {
-        this(app, 0l, null, numSlots);
-    }
-
-    /**
-     * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
-     * implement the logic required to create a slot. For example, compute statistics from aggregations and get
-     * variables ready for the new slot.
-     * 
-     * If the implementation class doesn't use periodic slots, this method will never be called. Use
-     * {@link #addSlot(Object)} instead.
-     * 
-     * @return the slot object
-     */
-    abstract protected T addPeriodicSlot();
-
-    /**
-     * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
-     * {@link #addPeriodicSlot()} instead.
-     * 
-     * @param slot
-     */
-    protected void addSlot(T slot) {
-
-        if (timer != null) {
-            logger.error("Calling method addSlot() in a periodic window is not allowed.");
-            return;
-        }
-        circularBuffer.add(slot);
-    }
-
-    protected void onCreate() {
-        circularBuffer = new CircularFifoBuffer<T>(numSlots);
-    }
-
-    /**
-     * 
-     * @return the least recently inserted slot
-     */
-    protected T getOldestSlot() {
-
-        return circularBuffer.get();
-    }
-
-    /** Stops the the sliding window. */
-    protected void stop() {
-        timer.cancel();
-    }
-
-    /**
-     * 
-     * @return the collection of slots
-     */
-    protected Collection<T> getSlots() {
-        return circularBuffer;
-    }
-
-    private class SlotTask extends TimerTask {
-
-        @Override
-        public void run() {
-
-            logger.trace("START TIMER TASK");
-
-            /* Iterate over all instances and put a new slot in the buffer. */
-            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
-                logger.trace("pe id: " + entry.getValue().id);
-                @SuppressWarnings("unchecked")
-                WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
-
-                if (peInstance.circularBuffer == null) {
-                    peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
-                }
-                synchronized (peInstance) {
-                    peInstance.circularBuffer.add(peInstance.addPeriodicSlot());
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
new file mode 100644
index 0000000..c76f050
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
@@ -0,0 +1,31 @@
+package org.apache.s4.core.window;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Window slot that keeps all data elements as a list.
+ * 
+ * @param <T>
+ *            Type of slot elements
+ */
+public class DefaultAggregatingSlot<T, U> implements Slot<T, U> {
+
+    List<T> data = null;
+
+    @Override
+    public void addData(T datum) {
+        if (data == null) {
+            data = new ArrayList<T>();
+        }
+        data.add(datum);
+
+    }
+
+    @Override
+    public U getAggregatedData() {
+        return (U) ((data == null) ? Collections.emptyList() : data);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
new file mode 100644
index 0000000..b5bd266
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
@@ -0,0 +1,23 @@
+package org.apache.s4.core.window;
+
+/**
+ * A convenience window slot, that aggregates elements of type <T> into elements of type <U>.
+ * 
+ * @param <T>
+ *            elements to aggregate
+ * @param <U>
+ *            aggregated elements (can be a list, or can be a result of some processing on the elements)
+ */
+public interface Slot<T, U> {
+
+    /**
+     * Add a single data element
+     */
+    void addData(T data);
+
+    /**
+     * Retrieve all stored elements s
+     */
+    U getAggregatedData();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
new file mode 100644
index 0000000..6595a5f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2011 The S4 Project, http://s4.io.
+ * All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.core.window;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
+ * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
+ * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
+ * 
+ * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
+ * efficient for sparse event streams.
+ */
+public abstract class WindowingPE<T> extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
+
+    final private int numSlots;
+    private CircularFifoBuffer<T> circularBuffer;
+    final private Timer windowingTimer;
+    final private long slotDurationInMilliseconds;
+
+    protected T currentSlot;
+
+    /**
+     * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
+     * 
+     * @param app
+     *            the application
+     * @param slotDuration
+     *            the slot duration in timeUnit
+     * @param timeUnit
+     *            the unit of time
+     * @param numSlots
+     *            the number of slots to be stored
+     */
+    public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
+        super(app);
+        this.numSlots = numSlots;
+
+        if (slotDuration > 0l) {
+            slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
+            windowingTimer = new Timer();
+
+        } else {
+            slotDurationInMilliseconds = 0;
+            windowingTimer = null;
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void initPEPrototypeInternal() {
+        super.initPEPrototypeInternal();
+        windowingTimer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
+        logger.trace("TIMER: " + slotDurationInMilliseconds);
+
+    }
+
+    /**
+     * 
+     * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
+     * concrete class.
+     * 
+     * @param app
+     *            the application
+     * @param numSlots
+     *            the number of slots to be stored
+     */
+    public WindowingPE(App app, int numSlots) {
+        this(app, 0l, null, numSlots);
+    }
+
+    /**
+     * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
+     * implement the logic required to create a slot. For example, compute statistics from aggregations and get
+     * variables ready for the new slot.
+     * 
+     * If the implementation class doesn't use periodic slots, this method will never be called. Use
+     * {@link #addSlot(Object)} instead.
+     * 
+     * @return the slot object
+     */
+    abstract protected T addPeriodicSlot();
+
+    /**
+     * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
+     * {@link #addPeriodicSlot()} instead.
+     * 
+     * @param slot
+     */
+    protected void addSlot(T slot) {
+
+        if (windowingTimer != null) {
+            logger.error("Calling method addSlot() in a periodic window is not allowed.");
+            return;
+        }
+        circularBuffer.add(slot);
+    }
+
+    protected void onCreate() {
+        circularBuffer = new CircularFifoBuffer<T>(numSlots);
+        if (slotDurationInMilliseconds > 0) {
+            currentSlot = addPeriodicSlot();
+            circularBuffer.add(currentSlot);
+        }
+    }
+
+    /**
+     * 
+     * @return the least recently inserted slot
+     */
+    protected T getOldestSlot() {
+
+        return circularBuffer.get();
+    }
+
+    /** Stops the the sliding window. */
+    protected void stop() {
+        windowingTimer.cancel();
+    }
+
+    /**
+     * 
+     * @return the collection of slots
+     */
+    protected Collection<T> getSlots() {
+        return circularBuffer;
+    }
+
+    private class SlotTask extends TimerTask {
+
+        @Override
+        public void run() {
+
+            logger.trace("Starting slot task");
+
+            /* Iterate over all instances and put a new slot in the buffer. */
+            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
+                logger.trace("pe id: " + entry.getValue().getId());
+                @SuppressWarnings("unchecked")
+                WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
+
+                if (peInstance.circularBuffer == null) {
+                    peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
+                }
+                synchronized (peInstance) {
+                    peInstance.currentSlot = peInstance.addPeriodicSlot();
+                    peInstance.circularBuffer.add(peInstance.currentSlot);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
new file mode 100644
index 0000000..4a69037
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -0,0 +1,65 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.Slot;
+import org.apache.s4.core.window.WindowingPE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WindowingPE1 extends WindowingPE<Slot<Integer, List<Integer>>> {
+
+    private static Logger logger = LoggerFactory.getLogger(WindowingPE1.class);
+    AtomicInteger counter = new AtomicInteger();
+
+    public WindowingPE1(App app, int numSlots) {
+        super(app, numSlots);
+    }
+
+    @Override
+    protected Slot<Integer, List<Integer>> addPeriodicSlot() {
+        return new DefaultAggregatingSlot<Integer, List<Integer>>();
+    }
+
+    public void onEvent(Event event) {
+
+        Integer value = event.get("value", Integer.class);
+        currentSlot.addData(value);
+        counter.incrementAndGet();
+        if (counter.get() % 1000 == 0) {
+            logger.trace("received value [{}]", event.get("value", Integer.class));
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+
+    @Override
+    protected void onTime() {
+        if (counter.get() == WindowingPETest.NB_EVENTS) {
+            List<Integer> values = new ArrayList<Integer>();
+
+            for (Slot<Integer, List<Integer>> slot : getSlots()) {
+                values.addAll(slot.getAggregatedData());
+            }
+            // System.out.println(Arrays.toString(values.toArray(new Integer[] {})));
+            WindowingPETest.allValues.addAll(values);
+            WindowingPETest.signalAllEventsProcessed.countDown();
+        }
+
+    }
+
+    public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
+        super(app, slotDuration, timeUnit, numSlots);
+        // TODO Auto-generated constructor stub
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
new file mode 100644
index 0000000..c154d32
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -0,0 +1,97 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.core.window.Slot;
+import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class WindowingPETest {
+
+    public static final long NB_EVENTS = 1000000;
+    public static final CountDownLatch signalAllEventsProcessed = new CountDownLatch(1);
+    public static final List<Integer> allValues = new ArrayList<Integer>();
+
+    private static final String STREAM_NAME = "stream1";
+    private static final String APP_NAME = "app1";
+
+    @Test
+    public void test() {
+        ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
+                .getLogger(Logger.ROOT_LOGGER_NAME);
+        root.setLevel(Level.DEBUG);
+        Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+        TestTimeWindowedApp app = injector.getInstance(TestTimeWindowedApp.class);
+        app.init();
+        app.start();
+
+        for (int i = 0; i < NB_EVENTS; i++) {
+            Event e = new Event();
+            e.put("value", Integer.class, i);
+            app.stream1.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(e)));
+        }
+
+        try {
+            Assert.assertTrue(signalAllEventsProcessed.await(30, TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(NB_EVENTS, allValues.size());
+        for (int i = 0; i < NB_EVENTS; i++) {
+            Assert.assertEquals((Integer) i, allValues.get(i));
+        }
+    }
+
+    public static class TestTimeWindowedApp extends App {
+
+        private Stream<Event> stream1;
+
+        @Override
+        protected void onStart() {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        protected void onInit() {
+            WindowingPE<Slot<Integer, List<Integer>>> wPE1 = createWindowingPE(WindowingPE1.class, 10L,
+                    TimeUnit.MILLISECONDS, 100000);
+            wPE1.setTimerInterval(10, TimeUnit.MILLISECONDS);
+            stream1 = createStream(STREAM_NAME, new KeyFinder<Event>() {
+
+                @Override
+                public List<String> get(final Event event) {
+                    return ImmutableList.of("X");
+                }
+            }, wPE1);
+
+        }
+
+        @Override
+        protected void onClose() {
+            // TODO Auto-generated method stub
+
+        }
+
+    }
+
+}