You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/24 16:19:48 UTC
[2/4] git commit: updated windowing PE API
updated windowing PE API
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/995d358e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/995d358e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/995d358e
Branch: refs/heads/piper
Commit: 995d358e1e0c17f3b508d280409e4bca3042ecbf
Parents: e3f312e
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Jul 19 18:23:42 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jul 19 18:23:42 2012 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/core/App.java | 12 +-
.../java/org/apache/s4/core/ProcessingElement.java | 25 ++-
.../s4/core/window/AbstractSlidingWindowPE.java | 219 +++++++++++++++
.../s4/core/window/DefaultAggregatingSlot.java | 40 ++-
.../java/org/apache/s4/core/window/OHCLSlot.java | 61 ++++
.../main/java/org/apache/s4/core/window/Slot.java | 16 +-
.../org/apache/s4/core/window/SlotFactory.java | 13 +
.../org/apache/s4/core/window/WindowingPE.java | 184 ------------
.../org/apache/s4/core/windowing/WindowingPE1.java | 38 ++--
.../apache/s4/core/windowing/WindowingPETest.java | 10 +-
10 files changed, 383 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 3e3eee8..0d958fd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,9 @@ import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.SlotFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -400,12 +402,12 @@ public abstract class App {
}
- public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
- int numSlots) {
+ public <T extends AbstractSlidingWindowPE> T createSlidingWindowPE(Class<T> type, long slotDuration,
+ TimeUnit timeUnit, int numSlots, SlotFactory slotFactory) {
try {
- Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+ Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class, SlotFactory.class };
T pe = type.getDeclaredConstructor(types).newInstance(
- new Object[] { this, slotDuration, timeUnit, numSlots });
+ new Object[] { this, slotDuration, timeUnit, numSlots, slotFactory });
return pe;
} catch (Exception e) {
logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index d4e26e9..0f14899 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -15,11 +15,14 @@
*/
package org.apache.s4.core;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection;
import java.util.Map;
-import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
@@ -37,6 +40,7 @@ import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* <p>
@@ -115,7 +119,7 @@ public abstract class ProcessingElement implements Cloneable {
private ProcessingElement pePrototype;
private boolean haveTriggers = false;
private long timerIntervalInMilliseconds = 0;
- private Timer timer;
+ private ScheduledExecutorService timer;
private boolean isPrototype = true;
private boolean isThreadSafe = false;
private String name = null;
@@ -375,13 +379,21 @@ public abstract class ProcessingElement implements Cloneable {
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
if (timer != null) {
- timer.cancel();
+ timer.shutdownNow();
}
if (interval == 0)
return this;
- timer = new Timer();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Expection from timer thread", e);
+ }
+ }).setNameFormat("Timer-" + getClass().getSimpleName()).build();
+ timer = Executors.newSingleThreadScheduledExecutor(threadFactory);
return this;
}
@@ -416,6 +428,7 @@ public abstract class ProcessingElement implements Cloneable {
if (haveTriggers && isTrigger(event)) {
overloadDispatcher.dispatchTrigger(this, event);
}
+
}
}
@@ -467,7 +480,7 @@ public abstract class ProcessingElement implements Cloneable {
/* Close resources in prototype. */
if (timer != null) {
- timer.cancel();
+ timer.shutdownNow();
logger.info("Timer stopped.");
}
@@ -504,7 +517,7 @@ public abstract class ProcessingElement implements Cloneable {
/* Start timer. */
if (timer != null) {
- timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
+ timer.scheduleAtFixedRate(new OnTimeTask(), 0, timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
new file mode 100644
index 0000000..f26ede6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
@@ -0,0 +1,219 @@
+package org.apache.s4.core.window;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
+ * slot. Each slot represents a segment of time or a fixed number of events. Slots are consecutive in time or events.
+ *
+ * Users are expected to provide a factory for creating new slots, and a method to perform a global computation on the
+ * current window.
+ *
+ * Slots are automatically added.
+ *
+ * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
+ * efficient for sparse event streams.
+ *
+ * @param <T>
+ * type of the slot implementation used for this window
+ *
+ * @param <U>
+ * type of the values added to the window slots
+ */
+public abstract class AbstractSlidingWindowPE<T extends Slot<U>, U, V> extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractSlidingWindowPE.class);
+
+ final private int numSlots;
+ private CircularFifoBuffer<T> circularBuffer;
+ final private ScheduledExecutorService windowingTimerService;
+ final private long slotDurationInMilliseconds;
+
+ private T openSlot;
+ private final SlotFactory<T> slotFactory;
+
+ private long slotCapacity = 0;
+ private int eventCount = 0;
+
+ /**
+ *
+ * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
+ * concrete class.
+ *
+ * @param app
+ * the application
+ * @param numSlots
+ * the number of slots to be stored
+ */
+ public AbstractSlidingWindowPE(App app, int numSlots, long slotCapacity, SlotFactory<T> slotFactory) {
+ this(app, 0L, null, numSlots, slotFactory, slotCapacity);
+ }
+
+ /**
+ * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
+ *
+ * @param app
+ * the application
+ * @param slotDuration
+ * the slot duration in timeUnit
+ * @param timeUnit
+ * the unit of time
+ * @param numSlots
+ * the number of slots to be stored
+ */
+ public AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+ SlotFactory<T> slotFactory) {
+ this(app, slotDuration, timeUnit, numSlots, slotFactory, 0);
+
+ }
+
+ private AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+ SlotFactory<T> slotFactory, long slotCapacity) {
+ super(app);
+ this.numSlots = numSlots;
+ this.slotFactory = slotFactory;
+ this.slotCapacity = slotCapacity;
+ if (slotDuration > 0l) {
+ slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("SlidingWindow-" + getClass().getSimpleName()).build();
+ windowingTimerService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+ } else {
+ slotDurationInMilliseconds = 0;
+ windowingTimerService = null;
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.
+ */
+ public final void onTrigger(Event event) {
+ if (windowingTimerService == null) {
+ if (eventCount % slotCapacity == 0) {
+ addSlot();
+ }
+ }
+ }
+
+ @Override
+ protected void initPEPrototypeInternal() {
+ super.initPEPrototypeInternal();
+ windowingTimerService.scheduleAtFixedRate(new SlotTask(), slotDurationInMilliseconds,
+ slotDurationInMilliseconds, TimeUnit.MILLISECONDS);
+ logger.trace("TIMER: " + slotDurationInMilliseconds);
+
+ }
+
+ /**
+ * User provided function that evaluates the whole content of the window. It must iterate across all slots. Current
+ * slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is
+ * safe.
+ *
+ * @return
+ */
+ abstract protected V evaluateWindow(Collection<T> slots);
+
+ /**
+ * Add an object to the sliding window. Use it when the window is not periodic.
+ *
+ * @param slot
+ */
+ protected final void addSlot() {
+
+ if (windowingTimerService != null) {
+ logger.error("Calling method addSlot() in a periodic window is not allowed.");
+ return;
+ }
+ addNewSlot((AbstractSlidingWindowPE<T, U, V>) this);
+ }
+
+ protected void onCreate() {
+ eventCount = 0;
+ circularBuffer = new CircularFifoBuffer<T>(numSlots);
+ if (slotDurationInMilliseconds > 0) {
+ openSlot = slotFactory.createSlot();
+ circularBuffer.add(openSlot);
+ }
+ }
+
+ protected void updateOpenSlot(U data) {
+ openSlot.update(data);
+ }
+
+ /**
+ *
+ * @return the least recently inserted slot
+ */
+ protected T getOldestSlot() {
+
+ return circularBuffer.get();
+ }
+
+ /** Stops the the sliding window. */
+ protected void stop() {
+ windowingTimerService.shutdownNow();
+ }
+
+ /**
+ *
+ * @return the collection of slots
+ */
+ protected Collection<T> getSlots() {
+ return circularBuffer;
+ }
+
+ protected T getOpenSlot() {
+ return openSlot;
+ }
+
+ private class SlotTask extends TimerTask {
+
+ @Override
+ public void run() {
+
+ logger.trace("Starting slot task");
+
+ /* Iterate over all instances and put a new slot in the buffer. */
+ for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
+ logger.trace("pe id: " + entry.getValue().getId());
+ @SuppressWarnings("unchecked")
+ AbstractSlidingWindowPE<T, U, V> peInstance = (AbstractSlidingWindowPE<T, U, V>) entry.getValue();
+
+ if (peInstance.circularBuffer == null) {
+ peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
+ }
+ addNewSlot(peInstance);
+ }
+ }
+ }
+
+ private void addNewSlot(AbstractSlidingWindowPE<T, U, V> peInstance) {
+ synchronized (peInstance) {
+ peInstance.openSlot.close();
+ peInstance.openSlot = slotFactory.createSlot();
+ peInstance.circularBuffer.add(peInstance.openSlot);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
index 84bfac7..61af0e6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
@@ -1,31 +1,51 @@
package org.apache.s4.core.window;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import com.google.common.collect.ImmutableList;
+
/**
* Window slot that keeps all data elements as a list.
- *
+ *
* @param <T>
* Type of slot elements
*/
-public class DefaultAggregatingSlot<T, U> implements Slot<T, U> {
+public class DefaultAggregatingSlot<T> implements Slot<T> {
List<T> data = null;
+ boolean open = true;
@Override
- public void addData(T datum) {
- if (data == null) {
- data = new ArrayList<T>();
+ public void update(T datum) {
+ if (open) {
+ if (data == null) {
+ data = new ArrayList<T>();
+ }
+ data.add(datum);
}
- data.add(datum);
-
}
@Override
- public U getAggregatedData() {
- return (U) ((data == null) ? Collections.emptyList() : data);
+ public void close() {
+ open = false;
+ if (data == null) {
+ data = ImmutableList.of();
+ } else {
+ data = ImmutableList.copyOf(data);
+ }
}
+ public List<T> getAggregatedData() {
+ return data == null ? (List<T>) ImmutableList.of() : data;
+ }
+
+ public static class DefaultAggregatingSlotFactory<T> implements SlotFactory<DefaultAggregatingSlot<T>> {
+
+ @Override
+ public DefaultAggregatingSlot<T> createSlot() {
+ return new DefaultAggregatingSlot<T>();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
new file mode 100644
index 0000000..17410d3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
@@ -0,0 +1,61 @@
+package org.apache.s4.core.window;
+
+public class OHCLSlot implements Slot<Double> {
+
+ double open = -1;
+ double high = -1;
+ double low = -1;
+ double close = -1;
+ long ticks = 0;
+ boolean isOpen;
+
+ @Override
+ public void update(Double data) {
+ if (isOpen) {
+ if (open == -1) {
+ open = low = high = close = data;
+ } else if (data > high) {
+ high = data;
+ } else if (data < low) {
+ low = data;
+ }
+ close = data;
+ ticks++;
+ }
+ }
+
+ @Override
+ public void close() {
+ isOpen = false;
+ }
+
+ double getOpen() {
+ return open;
+ }
+
+ double getClose() {
+ return close;
+ }
+
+ double getHigh() {
+ return high;
+ }
+
+ double getLow() {
+ return low;
+ }
+
+ long getTicksCount() {
+ return ticks;
+ }
+
+ public static class OHCLSlotFactory implements SlotFactory<OHCLSlot> {
+
+ @Override
+ public OHCLSlot createSlot() {
+ return new OHCLSlot();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
index 579d72a..839639f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
@@ -1,23 +1,23 @@
package org.apache.s4.core.window;
/**
- * A convenience window slot, that aggregates elements of type <T> into elements of type <U>.
- *
+ * A convenience window slot, that aggregates elements of type <T>.
+ *
+ * Users must add suitable getter methods to retrieve aggregated data.
+ *
* @param <T>
* elements to aggregate
- * @param <U>
- * aggregated elements (can be a list, or can be a result of some processing on the elements)
*/
-public interface Slot<T, U> {
+public interface Slot<T> {
/**
* Add a single data element
*/
- void addData(T data);
+ void update(T data);
/**
- * Retrieve all stored elements s
+ * Compute aggregated data on available gathered slot data, place slot and slot data in immutable state.
*/
- U getAggregatedData();
+ void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
new file mode 100644
index 0000000..d40bc92
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.core.window;
+
+/**
+ * Defines factory for window slots
+ *
+ * @param <T>
+ * slot class or interface that is produced
+ */
+public interface SlotFactory<T> {
+
+ T createSlot();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
deleted file mode 100644
index a2e45e2..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (c) 2011 The S4 Project, http://s4.io.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.core.window;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
- * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
- * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
- *
- * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
- * efficient for sparse event streams.
- */
-public abstract class WindowingPE<T> extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
-
- final private int numSlots;
- private CircularFifoBuffer<T> circularBuffer;
- final private Timer windowingTimer;
- final private long slotDurationInMilliseconds;
-
- protected T currentSlot;
-
- /**
- * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
- *
- * @param app
- * the application
- * @param slotDuration
- * the slot duration in timeUnit
- * @param timeUnit
- * the unit of time
- * @param numSlots
- * the number of slots to be stored
- */
- public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
- super(app);
- this.numSlots = numSlots;
-
- if (slotDuration > 0l) {
- slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
- windowingTimer = new Timer();
-
- } else {
- slotDurationInMilliseconds = 0;
- windowingTimer = null;
- }
- }
-
- @Override
- protected void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void initPEPrototypeInternal() {
- super.initPEPrototypeInternal();
- windowingTimer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
- logger.trace("TIMER: " + slotDurationInMilliseconds);
-
- }
-
- /**
- *
- * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
- * concrete class.
- *
- * @param app
- * the application
- * @param numSlots
- * the number of slots to be stored
- */
- public WindowingPE(App app, int numSlots) {
- this(app, 0l, null, numSlots);
- }
-
- /**
- * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
- * implement the logic required to create a slot. For example, compute statistics from aggregations and get
- * variables ready for the new slot.
- *
- * If the implementation class doesn't use periodic slots, this method will never be called. Use
- * {@link #addSlot(Object)} instead.
- *
- * @return the slot object
- */
- abstract protected T addPeriodicSlot();
-
- /**
- * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
- * {@link #addPeriodicSlot()} instead.
- *
- * @param slot
- */
- protected void addSlot(T slot) {
-
- if (windowingTimer != null) {
- logger.error("Calling method addSlot() in a periodic window is not allowed.");
- return;
- }
- circularBuffer.add(slot);
- }
-
- protected void onCreate() {
- circularBuffer = new CircularFifoBuffer<T>(numSlots);
- if (slotDurationInMilliseconds > 0) {
- currentSlot = addPeriodicSlot();
- circularBuffer.add(currentSlot);
- }
- }
-
- /**
- *
- * @return the least recently inserted slot
- */
- protected T getOldestSlot() {
-
- return circularBuffer.get();
- }
-
- /** Stops the the sliding window. */
- protected void stop() {
- windowingTimer.cancel();
- }
-
- /**
- *
- * @return the collection of slots
- */
- protected Collection<T> getSlots() {
- return circularBuffer;
- }
-
- private class SlotTask extends TimerTask {
-
- @Override
- public void run() {
-
- logger.trace("Starting slot task");
-
- /* Iterate over all instances and put a new slot in the buffer. */
- for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
- logger.trace("pe id: " + entry.getValue().getId());
- @SuppressWarnings("unchecked")
- WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
-
- if (peInstance.circularBuffer == null) {
- peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
- }
- synchronized (peInstance) {
- peInstance.currentSlot = peInstance.addPeriodicSlot();
- peInstance.circularBuffer.add(peInstance.currentSlot);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
index 4a69037..76e60b8 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -1,36 +1,38 @@
package org.apache.s4.core.windowing;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.s4.base.Event;
import org.apache.s4.core.App;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
import org.apache.s4.core.window.DefaultAggregatingSlot;
-import org.apache.s4.core.window.Slot;
-import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.core.window.SlotFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WindowingPE1 extends WindowingPE<Slot<Integer, List<Integer>>> {
+public class WindowingPE1 extends AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> {
private static Logger logger = LoggerFactory.getLogger(WindowingPE1.class);
AtomicInteger counter = new AtomicInteger();
- public WindowingPE1(App app, int numSlots) {
- super(app, numSlots);
+ public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+ SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+ super(app, slotDuration, timeUnit, numSlots, slotFactory);
}
- @Override
- protected Slot<Integer, List<Integer>> addPeriodicSlot() {
- return new DefaultAggregatingSlot<Integer, List<Integer>>();
+ public WindowingPE1(App app, int numSlots, long slotCapacity,
+ SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+ super(app, numSlots, slotCapacity, slotFactory);
}
public void onEvent(Event event) {
Integer value = event.get("value", Integer.class);
- currentSlot.addData(value);
+ updateOpenSlot(value);
counter.incrementAndGet();
if (counter.get() % 1000 == 0) {
logger.trace("received value [{}]", event.get("value", Integer.class));
@@ -45,21 +47,21 @@ public class WindowingPE1 extends WindowingPE<Slot<Integer, List<Integer>>> {
@Override
protected void onTime() {
if (counter.get() == WindowingPETest.NB_EVENTS) {
- List<Integer> values = new ArrayList<Integer>();
-
- for (Slot<Integer, List<Integer>> slot : getSlots()) {
- values.addAll(slot.getAggregatedData());
- }
// System.out.println(Arrays.toString(values.toArray(new Integer[] {})));
- WindowingPETest.allValues.addAll(values);
+ WindowingPETest.allValues.addAll(evaluateWindow(getSlots()));
WindowingPETest.signalAllEventsProcessed.countDown();
}
}
- public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
- super(app, slotDuration, timeUnit, numSlots);
- // TODO Auto-generated constructor stub
+ @Override
+ protected List<Integer> evaluateWindow(Collection<DefaultAggregatingSlot<Integer>> slots) {
+ List<Integer> values = new ArrayList<Integer>();
+
+ for (DefaultAggregatingSlot<Integer> slot : getSlots()) {
+ values.addAll(slot.getAggregatedData());
+ }
+ return values;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
index c154d32..80ba837 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -10,8 +10,9 @@ import org.apache.s4.base.EventMessage;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
import org.apache.s4.core.Stream;
-import org.apache.s4.core.window.Slot;
-import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.DefaultAggregatingSlot.DefaultAggregatingSlotFactory;
import org.apache.s4.fixtures.MockCommModule;
import org.apache.s4.fixtures.MockCoreModule;
import org.junit.Assert;
@@ -73,8 +74,9 @@ public class WindowingPETest {
@Override
protected void onInit() {
- WindowingPE<Slot<Integer, List<Integer>>> wPE1 = createWindowingPE(WindowingPE1.class, 10L,
- TimeUnit.MILLISECONDS, 100000);
+ AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> wPE1 = createSlidingWindowPE(
+ WindowingPE1.class, 10L, TimeUnit.MILLISECONDS, 100000,
+ new DefaultAggregatingSlotFactory<Integer>());
wPE1.setTimerInterval(10, TimeUnit.MILLISECONDS);
stream1 = createStream(STREAM_NAME, new KeyFinder<Event>() {