You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/04 09:39:58 UTC
[1/2] git commit: Improvements proposal for the windowing mechanism -
added utility classes for slots - added regression test for time based
windows
Updated Branches:
refs/heads/S4-57 [created] 510875add
Improvements proposal for the windowing mechanism
- added utility classes for slots
- added regression test for time based windows
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/510875ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/510875ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/510875ad
Branch: refs/heads/S4-57
Commit: 510875add704ef641464bfd8f12d749811c278e8
Parents: adad748
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Jul 1 17:44:39 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sun Jul 1 17:44:39 2012 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/core/App.java | 2 +-
.../java/org/apache/s4/core/ProcessingElement.java | 6 +-
.../main/java/org/apache/s4/core/WindowingPE.java | 163 -------------
.../s4/core/window/DefaultAggregatingSlot.java | 31 +++
.../main/java/org/apache/s4/core/window/Slot.java | 23 ++
.../org/apache/s4/core/window/WindowingPE.java | 184 +++++++++++++++
.../org/apache/s4/core/windowing/WindowingPE1.java | 65 +++++
.../apache/s4/core/windowing/WindowingPETest.java | 97 ++++++++
8 files changed, 404 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index fe4ceed..06fd9a4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,7 @@ import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.window.WindowingPE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 4dced75..d4e26e9 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -109,7 +109,7 @@ public abstract class ProcessingElement implements Cloneable {
Map<Class<? extends Event>, Trigger> triggers;
/* PE instance id. */
- String id = "";
+ protected String id = "";
/* Private fields. */
private ProcessingElement pePrototype;
@@ -211,7 +211,7 @@ public abstract class ProcessingElement implements Cloneable {
return peInstances.size();
}
- Map<String, ProcessingElement> getPEInstances() {
+ protected Map<String, ProcessingElement> getPEInstances() {
return peInstances.asMap();
}
@@ -490,7 +490,7 @@ public abstract class ProcessingElement implements Cloneable {
}
/* This method is called by App just before the application starts. */
- void initPEPrototypeInternal() {
+ protected void initPEPrototypeInternal() {
/* Eagerly create singleton PE. */
if (isSingleton) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
deleted file mode 100644
index 0b6c0af..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Copyright (c) 2011 The S4 Project, http://s4.io.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.core;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
- * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
- * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
- *
- * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
- * efficient for sparse event streams.
- */
-public abstract class WindowingPE<T> extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
-
- final private int numSlots;
- private CircularFifoBuffer<T> circularBuffer;
- final private Timer timer;
- final private long slotDurationInMilliseconds;
-
- /**
- * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
- *
- * @param app
- * the application
- * @param slotDuration
- * the slot duration in timeUnit
- * @param timeUnit
- * the unit of time
- * @param numSlots
- * the number of slots to be stored
- */
- public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
- super(app);
- this.numSlots = numSlots;
-
- if (slotDuration > 0l) {
- slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
- timer = new Timer();
- timer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
- logger.trace("TIMER: " + slotDurationInMilliseconds);
-
- } else {
- slotDurationInMilliseconds = 0;
- timer = null;
- }
- }
-
- /**
- *
- * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
- * concrete class.
- *
- * @param app
- * the application
- * @param numSlots
- * the number of slots to be stored
- */
- public WindowingPE(App app, int numSlots) {
- this(app, 0l, null, numSlots);
- }
-
- /**
- * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
- * implement the logic required to create a slot. For example, compute statistics from aggregations and get
- * variables ready for the new slot.
- *
- * If the implementation class doesn't use periodic slots, this method will never be called. Use
- * {@link #addSlot(Object)} instead.
- *
- * @return the slot object
- */
- abstract protected T addPeriodicSlot();
-
- /**
- * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
- * {@link #addPeriodicSlot()} instead.
- *
- * @param slot
- */
- protected void addSlot(T slot) {
-
- if (timer != null) {
- logger.error("Calling method addSlot() in a periodic window is not allowed.");
- return;
- }
- circularBuffer.add(slot);
- }
-
- protected void onCreate() {
- circularBuffer = new CircularFifoBuffer<T>(numSlots);
- }
-
- /**
- *
- * @return the least recently inserted slot
- */
- protected T getOldestSlot() {
-
- return circularBuffer.get();
- }
-
- /** Stops the the sliding window. */
- protected void stop() {
- timer.cancel();
- }
-
- /**
- *
- * @return the collection of slots
- */
- protected Collection<T> getSlots() {
- return circularBuffer;
- }
-
- private class SlotTask extends TimerTask {
-
- @Override
- public void run() {
-
- logger.trace("START TIMER TASK");
-
- /* Iterate over all instances and put a new slot in the buffer. */
- for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
- logger.trace("pe id: " + entry.getValue().id);
- @SuppressWarnings("unchecked")
- WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
-
- if (peInstance.circularBuffer == null) {
- peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
- }
- synchronized (peInstance) {
- peInstance.circularBuffer.add(peInstance.addPeriodicSlot());
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
new file mode 100644
index 0000000..c76f050
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
@@ -0,0 +1,31 @@
+package org.apache.s4.core.window;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Window slot that keeps all data elements as a list.
+ *
+ * @param <T>
+ * Type of slot elements
+ */
+public class DefaultAggregatingSlot<T, U> implements Slot<T, U> {
+
+ List<T> data = null;
+
+ @Override
+ public void addData(T datum) {
+ if (data == null) {
+ data = new ArrayList<T>();
+ }
+ data.add(datum);
+
+ }
+
+ @Override
+ public U getAggregatedData() {
+ return (U) ((data == null) ? Collections.emptyList() : data);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
new file mode 100644
index 0000000..b5bd266
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
@@ -0,0 +1,23 @@
+package org.apache.s4.core.window;
+
+/**
+ * A convenience window slot, that aggregates elements of type <T> into elements of type <U>.
+ *
+ * @param <T>
+ * elements to aggregate
+ * @param <U>
+ * aggregated elements (can be a list, or can be a result of some processing on the elements)
+ */
+public interface Slot<T, U> {
+
+ /**
+ * Add a single data element
+ */
+ void addData(T data);
+
+ /**
+ * Retrieve all stored elements s
+ */
+ U getAggregatedData();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
new file mode 100644
index 0000000..6595a5f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2011 The S4 Project, http://s4.io.
+ * All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.core.window;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
+ * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
+ * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
+ *
+ * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
+ * efficient for sparse event streams.
+ */
+public abstract class WindowingPE<T> extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
+
+ final private int numSlots;
+ private CircularFifoBuffer<T> circularBuffer;
+ final private Timer windowingTimer;
+ final private long slotDurationInMilliseconds;
+
+ protected T currentSlot;
+
+ /**
+ * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
+ *
+ * @param app
+ * the application
+ * @param slotDuration
+ * the slot duration in timeUnit
+ * @param timeUnit
+ * the unit of time
+ * @param numSlots
+ * the number of slots to be stored
+ */
+ public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
+ super(app);
+ this.numSlots = numSlots;
+
+ if (slotDuration > 0l) {
+ slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
+ windowingTimer = new Timer();
+
+ } else {
+ slotDurationInMilliseconds = 0;
+ windowingTimer = null;
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void initPEPrototypeInternal() {
+ super.initPEPrototypeInternal();
+ windowingTimer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
+ logger.trace("TIMER: " + slotDurationInMilliseconds);
+
+ }
+
+ /**
+ *
+ * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
+ * concrete class.
+ *
+ * @param app
+ * the application
+ * @param numSlots
+ * the number of slots to be stored
+ */
+ public WindowingPE(App app, int numSlots) {
+ this(app, 0l, null, numSlots);
+ }
+
+ /**
+ * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
+ * implement the logic required to create a slot. For example, compute statistics from aggregations and get
+ * variables ready for the new slot.
+ *
+ * If the implementation class doesn't use periodic slots, this method will never be called. Use
+ * {@link #addSlot(Object)} instead.
+ *
+ * @return the slot object
+ */
+ abstract protected T addPeriodicSlot();
+
+ /**
+ * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
+ * {@link #addPeriodicSlot()} instead.
+ *
+ * @param slot
+ */
+ protected void addSlot(T slot) {
+
+ if (windowingTimer != null) {
+ logger.error("Calling method addSlot() in a periodic window is not allowed.");
+ return;
+ }
+ circularBuffer.add(slot);
+ }
+
+ protected void onCreate() {
+ circularBuffer = new CircularFifoBuffer<T>(numSlots);
+ if (slotDurationInMilliseconds > 0) {
+ currentSlot = addPeriodicSlot();
+ circularBuffer.add(currentSlot);
+ }
+ }
+
+ /**
+ *
+ * @return the least recently inserted slot
+ */
+ protected T getOldestSlot() {
+
+ return circularBuffer.get();
+ }
+
+ /** Stops the the sliding window. */
+ protected void stop() {
+ windowingTimer.cancel();
+ }
+
+ /**
+ *
+ * @return the collection of slots
+ */
+ protected Collection<T> getSlots() {
+ return circularBuffer;
+ }
+
+ private class SlotTask extends TimerTask {
+
+ @Override
+ public void run() {
+
+ logger.trace("Starting slot task");
+
+ /* Iterate over all instances and put a new slot in the buffer. */
+ for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
+ logger.trace("pe id: " + entry.getValue().getId());
+ @SuppressWarnings("unchecked")
+ WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
+
+ if (peInstance.circularBuffer == null) {
+ peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
+ }
+ synchronized (peInstance) {
+ peInstance.currentSlot = peInstance.addPeriodicSlot();
+ peInstance.circularBuffer.add(peInstance.currentSlot);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
new file mode 100644
index 0000000..4a69037
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -0,0 +1,65 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.Slot;
+import org.apache.s4.core.window.WindowingPE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WindowingPE1 extends WindowingPE<Slot<Integer, List<Integer>>> {
+
+ private static Logger logger = LoggerFactory.getLogger(WindowingPE1.class);
+ AtomicInteger counter = new AtomicInteger();
+
+ public WindowingPE1(App app, int numSlots) {
+ super(app, numSlots);
+ }
+
+ @Override
+ protected Slot<Integer, List<Integer>> addPeriodicSlot() {
+ return new DefaultAggregatingSlot<Integer, List<Integer>>();
+ }
+
+ public void onEvent(Event event) {
+
+ Integer value = event.get("value", Integer.class);
+ currentSlot.addData(value);
+ counter.incrementAndGet();
+ if (counter.get() % 1000 == 0) {
+ logger.trace("received value [{}]", event.get("value", Integer.class));
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+
+ @Override
+ protected void onTime() {
+ if (counter.get() == WindowingPETest.NB_EVENTS) {
+ List<Integer> values = new ArrayList<Integer>();
+
+ for (Slot<Integer, List<Integer>> slot : getSlots()) {
+ values.addAll(slot.getAggregatedData());
+ }
+ // System.out.println(Arrays.toString(values.toArray(new Integer[] {})));
+ WindowingPETest.allValues.addAll(values);
+ WindowingPETest.signalAllEventsProcessed.countDown();
+ }
+
+ }
+
+ public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
+ super(app, slotDuration, timeUnit, numSlots);
+ // TODO Auto-generated constructor stub
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/510875ad/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
new file mode 100644
index 0000000..c154d32
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -0,0 +1,97 @@
+package org.apache.s4.core.windowing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.s4.core.window.Slot;
+import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.fixtures.MockCommModule;
+import org.apache.s4.fixtures.MockCoreModule;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class WindowingPETest {
+
+ public static final long NB_EVENTS = 1000000;
+ public static final CountDownLatch signalAllEventsProcessed = new CountDownLatch(1);
+ public static final List<Integer> allValues = new ArrayList<Integer>();
+
+ private static final String STREAM_NAME = "stream1";
+ private static final String APP_NAME = "app1";
+
+ @Test
+ public void test() {
+ ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
+ .getLogger(Logger.ROOT_LOGGER_NAME);
+ root.setLevel(Level.DEBUG);
+ Injector injector = Guice.createInjector(new MockCommModule(), new MockCoreModule());
+ TestTimeWindowedApp app = injector.getInstance(TestTimeWindowedApp.class);
+ app.init();
+ app.start();
+
+ for (int i = 0; i < NB_EVENTS; i++) {
+ Event e = new Event();
+ e.put("value", Integer.class, i);
+ app.stream1.receiveEvent(new EventMessage(APP_NAME, STREAM_NAME, app.getSerDeser().serialize(e)));
+ }
+
+ try {
+ Assert.assertTrue(signalAllEventsProcessed.await(30, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(NB_EVENTS, allValues.size());
+ for (int i = 0; i < NB_EVENTS; i++) {
+ Assert.assertEquals((Integer) i, allValues.get(i));
+ }
+ }
+
+ public static class TestTimeWindowedApp extends App {
+
+ private Stream<Event> stream1;
+
+ @Override
+ protected void onStart() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ WindowingPE<Slot<Integer, List<Integer>>> wPE1 = createWindowingPE(WindowingPE1.class, 10L,
+ TimeUnit.MILLISECONDS, 100000);
+ wPE1.setTimerInterval(10, TimeUnit.MILLISECONDS);
+ stream1 = createStream(STREAM_NAME, new KeyFinder<Event>() {
+
+ @Override
+ public List<String> get(final Event event) {
+ return ImmutableList.of("X");
+ }
+ }, wPE1);
+
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+}