You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/24 16:19:48 UTC

[2/4] git commit: updated windowing PE API

updated windowing PE API

Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/995d358e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/995d358e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/995d358e

Branch: refs/heads/piper
Commit: 995d358e1e0c17f3b508d280409e4bca3042ecbf
Parents: e3f312e
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Jul 19 18:23:42 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jul 19 18:23:42 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/core/App.java      |   12 +-
 .../java/org/apache/s4/core/ProcessingElement.java |   25 ++-
 .../s4/core/window/AbstractSlidingWindowPE.java    |  219 +++++++++++++++
 .../s4/core/window/DefaultAggregatingSlot.java     |   40 ++-
 .../java/org/apache/s4/core/window/OHCLSlot.java   |   61 ++++
 .../main/java/org/apache/s4/core/window/Slot.java  |   16 +-
 .../org/apache/s4/core/window/SlotFactory.java     |   13 +
 .../org/apache/s4/core/window/WindowingPE.java     |  184 ------------
 .../org/apache/s4/core/windowing/WindowingPE1.java |   38 ++--
 .../apache/s4/core/windowing/WindowingPETest.java  |   10 +-
 10 files changed, 383 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 3e3eee8..0d958fd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,9 @@ import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.SlotFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -400,12 +402,12 @@ public abstract class App {
 
     }
 
-    public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
-            int numSlots) {
+    public <T extends AbstractSlidingWindowPE> T createSlidingWindowPE(Class<T> type, long slotDuration,
+            TimeUnit timeUnit, int numSlots, SlotFactory slotFactory) {
         try {
-            Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+            Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class, SlotFactory.class };
             T pe = type.getDeclaredConstructor(types).newInstance(
-                    new Object[] { this, slotDuration, timeUnit, numSlots });
+                    new Object[] { this, slotDuration, timeUnit, numSlots, slotFactory });
             return pe;
         } catch (Exception e) {
             logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index d4e26e9..0f14899 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -15,11 +15,14 @@
  */
 package org.apache.s4.core;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import net.jcip.annotations.ThreadSafe;
@@ -37,6 +40,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * <p>
@@ -115,7 +119,7 @@ public abstract class ProcessingElement implements Cloneable {
     private ProcessingElement pePrototype;
     private boolean haveTriggers = false;
     private long timerIntervalInMilliseconds = 0;
-    private Timer timer;
+    private ScheduledExecutorService timer;
     private boolean isPrototype = true;
     private boolean isThreadSafe = false;
     private String name = null;
@@ -375,13 +379,21 @@ public abstract class ProcessingElement implements Cloneable {
         Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
 
         if (timer != null) {
-            timer.cancel();
+            timer.shutdownNow();
         }
 
         if (interval == 0)
             return this;
 
-        timer = new Timer();
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+                    @Override
+                    public void uncaughtException(Thread t, Throwable e) {
+                        logger.error("Expection from timer thread", e);
+                    }
+                }).setNameFormat("Timer-" + getClass().getSimpleName()).build();
+        timer = Executors.newSingleThreadScheduledExecutor(threadFactory);
         return this;
     }
 
@@ -416,6 +428,7 @@ public abstract class ProcessingElement implements Cloneable {
             if (haveTriggers && isTrigger(event)) {
                 overloadDispatcher.dispatchTrigger(this, event);
             }
+
         }
     }
 
@@ -467,7 +480,7 @@ public abstract class ProcessingElement implements Cloneable {
 
         /* Close resources in prototype. */
         if (timer != null) {
-            timer.cancel();
+            timer.shutdownNow();
             logger.info("Timer stopped.");
         }
 
@@ -504,7 +517,7 @@ public abstract class ProcessingElement implements Cloneable {
 
         /* Start timer. */
         if (timer != null) {
-            timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
+            timer.scheduleAtFixedRate(new OnTimeTask(), 0, timerIntervalInMilliseconds, TimeUnit.MILLISECONDS);
             logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
                     this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
new file mode 100644
index 0000000..f26ede6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/AbstractSlidingWindowPE.java
@@ -0,0 +1,219 @@
+package org.apache.s4.core.window;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections15.buffer.CircularFifoBuffer;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
+ * slot. Each slot represents a segment of time or a fixed number of events. Slots are consecutive in time or events.
+ * 
+ * Users are expected to provide a factory for creating new slots, and a method to perform a global computation on the
+ * current window.
+ * 
+ * Slots are automatically added.
+ * 
+ * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
+ * efficient for sparse event streams.
+ * 
+ * @param <T>
+ *            type of the slot implementation used for this window
+ * 
+ * @param <U>
+ *            type of the values added to the window slots
+ */
+public abstract class AbstractSlidingWindowPE<T extends Slot<U>, U, V> extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractSlidingWindowPE.class);
+
+    final private int numSlots;
+    private CircularFifoBuffer<T> circularBuffer;
+    final private ScheduledExecutorService windowingTimerService;
+    final private long slotDurationInMilliseconds;
+
+    private T openSlot;
+    private final SlotFactory<T> slotFactory;
+
+    private long slotCapacity = 0;
+    private int eventCount = 0;
+
+    /**
+     * 
+     * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
+     * concrete class.
+     * 
+     * @param app
+     *            the application
+     * @param numSlots
+     *            the number of slots to be stored
+     */
+    public AbstractSlidingWindowPE(App app, int numSlots, long slotCapacity, SlotFactory<T> slotFactory) {
+        this(app, 0L, null, numSlots, slotFactory, slotCapacity);
+    }
+
+    /**
+     * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
+     * 
+     * @param app
+     *            the application
+     * @param slotDuration
+     *            the slot duration in timeUnit
+     * @param timeUnit
+     *            the unit of time
+     * @param numSlots
+     *            the number of slots to be stored
+     */
+    public AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+            SlotFactory<T> slotFactory) {
+        this(app, slotDuration, timeUnit, numSlots, slotFactory, 0);
+
+    }
+
+    private AbstractSlidingWindowPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+            SlotFactory<T> slotFactory, long slotCapacity) {
+        super(app);
+        this.numSlots = numSlots;
+        this.slotFactory = slotFactory;
+        this.slotCapacity = slotCapacity;
+        if (slotDuration > 0l) {
+            slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
+            ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+                    .setNameFormat("SlidingWindow-" + getClass().getSimpleName()).build();
+            windowingTimerService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+
+        } else {
+            slotDurationInMilliseconds = 0;
+            windowingTimerService = null;
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    /**
+     * For count-based windows, we use a trigger that adds a new slot when the current one reaches its maximum capacity.
+     */
+    public final void onTrigger(Event event) {
+        if (windowingTimerService == null) {
+            if (eventCount % slotCapacity == 0) {
+                addSlot();
+            }
+        }
+    }
+
+    @Override
+    protected void initPEPrototypeInternal() {
+        super.initPEPrototypeInternal();
+        windowingTimerService.scheduleAtFixedRate(new SlotTask(), slotDurationInMilliseconds,
+                slotDurationInMilliseconds, TimeUnit.MILLISECONDS);
+        logger.trace("TIMER: " + slotDurationInMilliseconds);
+
+    }
+
+    /**
+     * User provided function that evaluates the whole content of the window. It must iterate across all slots. Current
+     * slots are passed as a parameter and the PE instance is expected to be locked so that iteration over the slots is
+     * safe.
+     * 
+     * @return
+     */
+    abstract protected V evaluateWindow(Collection<T> slots);
+
+    /**
+     * Add an object to the sliding window. Use it when the window is not periodic.
+     * 
+     * @param slot
+     */
+    protected final void addSlot() {
+
+        if (windowingTimerService != null) {
+            logger.error("Calling method addSlot() in a periodic window is not allowed.");
+            return;
+        }
+        addNewSlot((AbstractSlidingWindowPE<T, U, V>) this);
+    }
+
+    protected void onCreate() {
+        eventCount = 0;
+        circularBuffer = new CircularFifoBuffer<T>(numSlots);
+        if (slotDurationInMilliseconds > 0) {
+            openSlot = slotFactory.createSlot();
+            circularBuffer.add(openSlot);
+        }
+    }
+
+    protected void updateOpenSlot(U data) {
+        openSlot.update(data);
+    }
+
+    /**
+     * 
+     * @return the least recently inserted slot
+     */
+    protected T getOldestSlot() {
+
+        return circularBuffer.get();
+    }
+
+    /** Stops the the sliding window. */
+    protected void stop() {
+        windowingTimerService.shutdownNow();
+    }
+
+    /**
+     * 
+     * @return the collection of slots
+     */
+    protected Collection<T> getSlots() {
+        return circularBuffer;
+    }
+
+    protected T getOpenSlot() {
+        return openSlot;
+    }
+
+    private class SlotTask extends TimerTask {
+
+        @Override
+        public void run() {
+
+            logger.trace("Starting slot task");
+
+            /* Iterate over all instances and put a new slot in the buffer. */
+            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
+                logger.trace("pe id: " + entry.getValue().getId());
+                @SuppressWarnings("unchecked")
+                AbstractSlidingWindowPE<T, U, V> peInstance = (AbstractSlidingWindowPE<T, U, V>) entry.getValue();
+
+                if (peInstance.circularBuffer == null) {
+                    peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
+                }
+                addNewSlot(peInstance);
+            }
+        }
+    }
+
+    private void addNewSlot(AbstractSlidingWindowPE<T, U, V> peInstance) {
+        synchronized (peInstance) {
+            peInstance.openSlot.close();
+            peInstance.openSlot = slotFactory.createSlot();
+            peInstance.circularBuffer.add(peInstance.openSlot);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
index 84bfac7..61af0e6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/DefaultAggregatingSlot.java
@@ -1,31 +1,51 @@
 package org.apache.s4.core.window;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * Window slot that keeps all data elements as a list.
- *
+ * 
  * @param <T>
  *            Type of slot elements
  */
-public class DefaultAggregatingSlot<T, U> implements Slot<T, U> {
+public class DefaultAggregatingSlot<T> implements Slot<T> {
 
     List<T> data = null;
+    boolean open = true;
 
     @Override
-    public void addData(T datum) {
-        if (data == null) {
-            data = new ArrayList<T>();
+    public void update(T datum) {
+        if (open) {
+            if (data == null) {
+                data = new ArrayList<T>();
+            }
+            data.add(datum);
         }
-        data.add(datum);
-
     }
 
     @Override
-    public U getAggregatedData() {
-        return (U) ((data == null) ? Collections.emptyList() : data);
+    public void close() {
+        open = false;
+        if (data == null) {
+            data = ImmutableList.of();
+        } else {
+            data = ImmutableList.copyOf(data);
+        }
     }
 
+    public List<T> getAggregatedData() {
+        return data == null ? (List<T>) ImmutableList.of() : data;
+    }
+
+    public static class DefaultAggregatingSlotFactory<T> implements SlotFactory<DefaultAggregatingSlot<T>> {
+
+        @Override
+        public DefaultAggregatingSlot<T> createSlot() {
+            return new DefaultAggregatingSlot<T>();
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
new file mode 100644
index 0000000..17410d3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/OHCLSlot.java
@@ -0,0 +1,61 @@
+package org.apache.s4.core.window;
+
+public class OHCLSlot implements Slot<Double> {
+
+    double open = -1;
+    double high = -1;
+    double low = -1;
+    double close = -1;
+    long ticks = 0;
+    boolean isOpen;
+
+    @Override
+    public void update(Double data) {
+        if (isOpen) {
+            if (open == -1) {
+                open = low = high = close = data;
+            } else if (data > high) {
+                high = data;
+            } else if (data < low) {
+                low = data;
+            }
+            close = data;
+            ticks++;
+        }
+    }
+
+    @Override
+    public void close() {
+        isOpen = false;
+    }
+
+    double getOpen() {
+        return open;
+    }
+
+    double getClose() {
+        return close;
+    }
+
+    double getHigh() {
+        return high;
+    }
+
+    double getLow() {
+        return low;
+    }
+
+    long getTicksCount() {
+        return ticks;
+    }
+
+    public static class OHCLSlotFactory implements SlotFactory<OHCLSlot> {
+
+        @Override
+        public OHCLSlot createSlot() {
+            return new OHCLSlot();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
index 579d72a..839639f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/Slot.java
@@ -1,23 +1,23 @@
 package org.apache.s4.core.window;
 
 /**
- * A convenience window slot, that aggregates elements of type <T> into elements of type <U>.
- *
+ * A convenience window slot, that aggregates elements of type <T>.
+ * 
+ * Users must add suitable getter methods to retrieve aggregated data.
+ * 
  * @param <T>
  *            elements to aggregate
- * @param <U>
- *            aggregated elements (can be a list, or can be a result of some processing on the elements)
  */
-public interface Slot<T, U> {
+public interface Slot<T> {
 
     /**
      * Add a single data element
      */
-    void addData(T data);
+    void update(T data);
 
     /**
-     * Retrieve all stored elements s
+     * Compute aggregated data on available gathered slot data, place slot and slot data in immutable state.
      */
-    U getAggregatedData();
+    void close();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
new file mode 100644
index 0000000..d40bc92
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/SlotFactory.java
@@ -0,0 +1,13 @@
+package org.apache.s4.core.window;
+
+/**
+ * Defines factory for window slots
+ * 
+ * @param <T>
+ *            slot class or interface that is produced
+ */
+public interface SlotFactory<T> {
+
+    T createSlot();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
deleted file mode 100644
index a2e45e2..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/window/WindowingPE.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright (c) 2011 The S4 Project, http://s4.io.
- * All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.core.window;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract ProcessingElement that can store historical values using a sliding window. Each set of values is called a
- * slot. The concrete class must implement a class (the slot class) where values are stored. Each slot represents a
- * segment of time or a fixed number of events. Slots are consecutive in time or events. The slot object cannot be null.
- *
- * WHen using time-based slots, use this implementation only if you expect most slots to have values, it is not
- * efficient for sparse event streams.
- */
-public abstract class WindowingPE<T> extends ProcessingElement {
-
-    private static final Logger logger = LoggerFactory.getLogger(WindowingPE.class);
-
-    final private int numSlots;
-    private CircularFifoBuffer<T> circularBuffer;
-    final private Timer windowingTimer;
-    final private long slotDurationInMilliseconds;
-
-    protected T currentSlot;
-
-    /**
-     * Constructor for time-based slots. The abstract method {@link #addPeriodicSlot()} is called periodically.
-     *
-     * @param app
-     *            the application
-     * @param slotDuration
-     *            the slot duration in timeUnit
-     * @param timeUnit
-     *            the unit of time
-     * @param numSlots
-     *            the number of slots to be stored
-     */
-    public WindowingPE(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
-        super(app);
-        this.numSlots = numSlots;
-
-        if (slotDuration > 0l) {
-            slotDurationInMilliseconds = TimeUnit.MILLISECONDS.convert(slotDuration, timeUnit);
-            windowingTimer = new Timer();
-
-        } else {
-            slotDurationInMilliseconds = 0;
-            windowingTimer = null;
-        }
-    }
-
-    @Override
-    protected void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void initPEPrototypeInternal() {
-        super.initPEPrototypeInternal();
-        windowingTimer.schedule(new SlotTask(), slotDurationInMilliseconds, slotDurationInMilliseconds);
-        logger.trace("TIMER: " + slotDurationInMilliseconds);
-
-    }
-
-    /**
-     *
-     * Constructor for the event-based slot. The abstract method {@link #addPeriodicSlot()} must be called by the
-     * concrete class.
-     *
-     * @param app
-     *            the application
-     * @param numSlots
-     *            the number of slots to be stored
-     */
-    public WindowingPE(App app, int numSlots) {
-        this(app, 0l, null, numSlots);
-    }
-
-    /**
-     * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
-     * implement the logic required to create a slot. For example, compute statistics from aggregations and get
-     * variables ready for the new slot.
-     *
-     * If the implementation class doesn't use periodic slots, this method will never be called. Use
-     * {@link #addSlot(Object)} instead.
-     *
-     * @return the slot object
-     */
-    abstract protected T addPeriodicSlot();
-
-    /**
-     * Add an object to the sliding window. Use it when the window is not periodic. For periodic slots use
-     * {@link #addPeriodicSlot()} instead.
-     *
-     * @param slot
-     */
-    protected void addSlot(T slot) {
-
-        if (windowingTimer != null) {
-            logger.error("Calling method addSlot() in a periodic window is not allowed.");
-            return;
-        }
-        circularBuffer.add(slot);
-    }
-
-    protected void onCreate() {
-        circularBuffer = new CircularFifoBuffer<T>(numSlots);
-        if (slotDurationInMilliseconds > 0) {
-            currentSlot = addPeriodicSlot();
-            circularBuffer.add(currentSlot);
-        }
-    }
-
-    /**
-     *
-     * @return the least recently inserted slot
-     */
-    protected T getOldestSlot() {
-
-        return circularBuffer.get();
-    }
-
-    /** Stops the the sliding window. */
-    protected void stop() {
-        windowingTimer.cancel();
-    }
-
-    /**
-     *
-     * @return the collection of slots
-     */
-    protected Collection<T> getSlots() {
-        return circularBuffer;
-    }
-
-    private class SlotTask extends TimerTask {
-
-        @Override
-        public void run() {
-
-            logger.trace("Starting slot task");
-
-            /* Iterate over all instances and put a new slot in the buffer. */
-            for (Map.Entry<String, ProcessingElement> entry : getPEInstances().entrySet()) {
-                logger.trace("pe id: " + entry.getValue().getId());
-                @SuppressWarnings("unchecked")
-                WindowingPE<T> peInstance = (WindowingPE<T>) entry.getValue();
-
-                if (peInstance.circularBuffer == null) {
-                    peInstance.circularBuffer = new CircularFifoBuffer<T>(numSlots);
-                }
-                synchronized (peInstance) {
-                    peInstance.currentSlot = peInstance.addPeriodicSlot();
-                    peInstance.circularBuffer.add(peInstance.currentSlot);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
index 4a69037..76e60b8 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
@@ -1,36 +1,38 @@
 package org.apache.s4.core.windowing;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
 import org.apache.s4.core.window.DefaultAggregatingSlot;
-import org.apache.s4.core.window.Slot;
-import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.core.window.SlotFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WindowingPE1 extends WindowingPE<Slot<Integer, List<Integer>>> {
+public class WindowingPE1 extends AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> {
 
     private static Logger logger = LoggerFactory.getLogger(WindowingPE1.class);
     AtomicInteger counter = new AtomicInteger();
 
-    public WindowingPE1(App app, int numSlots) {
-        super(app, numSlots);
+    public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots,
+            SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+        super(app, slotDuration, timeUnit, numSlots, slotFactory);
     }
 
-    @Override
-    protected Slot<Integer, List<Integer>> addPeriodicSlot() {
-        return new DefaultAggregatingSlot<Integer, List<Integer>>();
+    public WindowingPE1(App app, int numSlots, long slotCapacity,
+            SlotFactory<DefaultAggregatingSlot<Integer>> slotFactory) {
+        super(app, numSlots, slotCapacity, slotFactory);
     }
 
     public void onEvent(Event event) {
 
         Integer value = event.get("value", Integer.class);
-        currentSlot.addData(value);
+        updateOpenSlot(value);
         counter.incrementAndGet();
         if (counter.get() % 1000 == 0) {
             logger.trace("received value [{}]", event.get("value", Integer.class));
@@ -45,21 +47,21 @@ public class WindowingPE1 extends WindowingPE<Slot<Integer, List<Integer>>> {
     @Override
     protected void onTime() {
         if (counter.get() == WindowingPETest.NB_EVENTS) {
-            List<Integer> values = new ArrayList<Integer>();
-
-            for (Slot<Integer, List<Integer>> slot : getSlots()) {
-                values.addAll(slot.getAggregatedData());
-            }
             // System.out.println(Arrays.toString(values.toArray(new Integer[] {})));
-            WindowingPETest.allValues.addAll(values);
+            WindowingPETest.allValues.addAll(evaluateWindow(getSlots()));
             WindowingPETest.signalAllEventsProcessed.countDown();
         }
 
     }
 
-    public WindowingPE1(App app, long slotDuration, TimeUnit timeUnit, int numSlots) {
-        super(app, slotDuration, timeUnit, numSlots);
-        // TODO Auto-generated constructor stub
+    @Override
+    protected List<Integer> evaluateWindow(Collection<DefaultAggregatingSlot<Integer>> slots) {
+        List<Integer> values = new ArrayList<Integer>();
+
+        for (DefaultAggregatingSlot<Integer> slot : getSlots()) {
+            values.addAll(slot.getAggregatedData());
+        }
+        return values;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/995d358e/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
index c154d32..80ba837 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
@@ -10,8 +10,9 @@ import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
-import org.apache.s4.core.window.Slot;
-import org.apache.s4.core.window.WindowingPE;
+import org.apache.s4.core.window.AbstractSlidingWindowPE;
+import org.apache.s4.core.window.DefaultAggregatingSlot;
+import org.apache.s4.core.window.DefaultAggregatingSlot.DefaultAggregatingSlotFactory;
 import org.apache.s4.fixtures.MockCommModule;
 import org.apache.s4.fixtures.MockCoreModule;
 import org.junit.Assert;
@@ -73,8 +74,9 @@ public class WindowingPETest {
 
         @Override
         protected void onInit() {
-            WindowingPE<Slot<Integer, List<Integer>>> wPE1 = createWindowingPE(WindowingPE1.class, 10L,
-                    TimeUnit.MILLISECONDS, 100000);
+            AbstractSlidingWindowPE<DefaultAggregatingSlot<Integer>, Integer, List<Integer>> wPE1 = createSlidingWindowPE(
+                    WindowingPE1.class, 10L, TimeUnit.MILLISECONDS, 100000,
+                    new DefaultAggregatingSlotFactory<Integer>());
             wPE1.setTimerInterval(10, TimeUnit.MILLISECONDS);
             stream1 = createStream(STREAM_NAME, new KeyFinder<Event>() {