You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:37 UTC

[36/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java
new file mode 100644
index 0000000..59560e9
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+
+public interface TopologyTestSetup {
+
+    TopologyProvider createTopologyProvider();
+
+    TopologyProvider getTopologyProvider();
+
+    Submitter<Topology, Job> createSubmitter();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java b/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java
new file mode 100644
index 0000000..0053328
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology.services;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.services.TopologyBuilder;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Some dummy test applications that will be part of this
+ * jar including having service provider entries.
+ * Used for testing the application service.
+ *
+ */
+public class TestApplications {
+        
+    private static abstract class App implements TopologyBuilder {
+
+        @Override
+        public BiConsumer<Topology, JsonObject> getBuilder() {
+            return (t,c) -> t.strings(getName()).print();
+        }     
+    }
+    
+    public static class AppOne extends App {
+        @Override
+        public String getName() {
+            return "FirstJarApp";
+        }
+    }
+    public static class AppTwo extends App {
+        @Override
+        public String getName() {
+            return "SecondJarApp";
+        }
+    }
+    public static class AppThree extends App {
+        @Override
+        public String getName() {
+            return "ThirdJarApp";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/InsertionTimeList.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/InsertionTimeList.java b/api/window/src/main/java/edgent/window/InsertionTimeList.java
deleted file mode 100644
index f1a0d84..0000000
--- a/api/window/src/main/java/edgent/window/InsertionTimeList.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-import java.util.AbstractSequentialList;
-import java.util.LinkedList;
-import java.util.ListIterator;
-
-/**
- * A window contents list that maintains insertion time.
- *
- * @param <T> Type of tuples in the list
- */
-public class InsertionTimeList<T> extends AbstractSequentialList<T> {
-    
-    private final LinkedList<T> tuples = new LinkedList<>();
-    private final LinkedList<Long> times = new LinkedList<>();
-    
-    void evictOlderThan(long evictTime) {
-        while(!times.isEmpty() && times.getFirst() <= evictTime){
-            remove();
-        }
-    }
-    
-    long nextEvictDelay(long timeMs) {
-        long firstTupleTime = times.get(0);
-        long nextEvictTime = firstTupleTime + timeMs;
-        
-        long timeToNextEvict = nextEvictTime - System.currentTimeMillis();
-        
-        return Math.max(0, timeToNextEvict);
-    }
-    
-    @Override
-    public ListIterator<T> listIterator(int index) {
-         return new TimedListIterator<>(tuples.listIterator(index), times.listIterator(index));
-    }
-    
-    @Override
-    public boolean add(T tuple) {
-        tuples.add(tuple);
-        times.add(System.currentTimeMillis());
-        return true;
-    }
-    @Override
-    public void clear() {
-        tuples.clear();
-        times.clear();
-    }
-    
-    private void remove() {
-        tuples.remove();
-        times.remove();
-    }
-
-    @Override
-    public int size() {
-         return tuples.size();
-    }
-    
-    private static class TimedListIterator<T> implements ListIterator<T> {
-        
-        private final ListIterator<T> ti;
-        private final ListIterator<Long> iti;
-        
-        TimedListIterator(ListIterator<T> ti, ListIterator<Long> iti) {
-            this.ti = ti;
-            this.iti = iti;
-        }     
-
-        @Override
-        public void add(T tuple) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return ti.hasNext();
-        }
-
-        @Override
-        public boolean hasPrevious() {
-            return ti.hasPrevious();
-        }
-
-        @Override
-        public T next() {
-            iti.next();
-            return ti.next();
-        }
-
-        @Override
-        public int nextIndex() {
-            return ti.nextIndex();
-        }
-
-        @Override
-        public T previous() {
-            iti.previous();
-            return ti.previous();
-        }
-
-        @Override
-        public int previousIndex() {
-            return ti.previousIndex();
-        }
-
-        @Override
-        public void remove() {
-            ti.remove();
-            iti.remove();            
-        }
-
-        @Override
-        public void set(T arg0) {
-            throw new UnsupportedOperationException();
-        }       
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Partition.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Partition.java b/api/window/src/main/java/edgent/window/Partition.java
deleted file mode 100644
index ec70672..0000000
--- a/api/window/src/main/java/edgent/window/Partition.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * A partition within a {@code Window}. The contents of the list
- * returned by {@code getContents} is stable when synchronizing 
- * on the partition object. For example:
- * 
- * <pre>{@code
- * Partition<Integer, Integer, ArrayList<Integer>> part = ...;
- * synchronized(part){
- *  List<Integer> = part.getContents();
- *  // stable operation on contents of partition
- * }
- * }</pre>
- *
- * @param <T> Type of tuples in the partition.
- * @param <K> Type of the partition's key.
- * @param <L> Type of the list holding the partition's tuples.
- * 
- * @see Window
- */
-public interface Partition<T, K, L extends List<T>> extends Serializable{    
-    /**
-     * Offers a tuple to be inserted into the partition.
-     * @param tuple Tuple to be offered.
-     * @return True if the tuple was inserted into this partition, false if it was rejected.
-     */
-    boolean insert(T tuple);
-        
-    /**
-     * Invoke the WindowProcessor's processWindow method. A partition processor
-     * must be registered prior to invoking process().
-     */
-    void process();
-    
-    /** 
-     * Calls the partition's evictDeterminer.
-     */
-    void evict();
-    
-    /**
-     * Retrieves the window contents.
-     * @return list of partition contents
-     */
-    L getContents();
-    
-    /**
-     * Return the window in which this partition is contained.
-     * @return the partition's window
-     */
-    Window<T, K, L> getWindow();
-    
-    /**
-     * Returns the key associated with this partition
-     * @return The key of the partition.
-     */
-    K getKey();
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/PartitionImpl.java b/api/window/src/main/java/edgent/window/PartitionImpl.java
deleted file mode 100644
index a535035..0000000
--- a/api/window/src/main/java/edgent/window/PartitionImpl.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-
-@SuppressWarnings("serial")
-class PartitionImpl<T, K, L extends List<T>> implements Partition<T, K, L> {
-    private final L tuples;
-    private final List<T> unmodifiableTuples;
-    private final Window<T, K, L> window;
-    private final K key;
-    
-    PartitionImpl(Window<T, K, L> window, L tuples, K key){
-        this.window = window;
-        this.tuples = tuples;
-        this.unmodifiableTuples = Collections.unmodifiableList(tuples);
-        this.key = key;
-    }
-
-    @Override
-    public synchronized boolean insert(T tuple) {        
-        
-        if (getWindow().getInsertionPolicy().apply(this, tuple)) {
-            getWindow().getContentsPolicy().accept(this, tuple);
-            this.tuples.add(tuple);
-            // Trigger
-            getWindow().getTriggerPolicy().accept(this, tuple);
-            return true;
-        }
-
-        return true;
-    }
-    
-    @Override
-    public synchronized void process() {
-        window.getPartitionProcessor().accept(unmodifiableTuples, key);
-    }
-
-    @Override
-    public synchronized L getContents() {
-        return tuples;
-    }
-
-    @Override
-    public Window<T, K, L> getWindow() {
-        return window;
-    }
-    
-    @Override
-    public K getKey() {
-        return key;
-    }
-
-    @Override
-    public synchronized void evict() {
-        Consumer<Partition<T, K, L>> evictDeterminer = window.getEvictDeterminer();
-        evictDeterminer.accept(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/PartitionedState.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/PartitionedState.java b/api/window/src/main/java/edgent/window/PartitionedState.java
deleted file mode 100644
index efc1054..0000000
--- a/api/window/src/main/java/edgent/window/PartitionedState.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edgent.function.Supplier;
-
-/**
- * Maintain partitioned state.
- * Abstract class that can be used to maintain state 
- * for each keyed partition in a {@link Window}.
- *
- * @param <K> Key type.
- * @param <S> State type.
- */
-public abstract class PartitionedState<K, S> {
-
-    private final Supplier<S> initialState;
-    private final Map<K, S> states = new HashMap<>();
-
-    /**
-     * Construct with an initial state function.
-     * @param initialState Function used to create the initial state for a key.
-     * 
-     * @see #getState(Object)
-     */
-    protected PartitionedState(Supplier<S> initialState) {
-        this.initialState = initialState;
-    }
-
-    /**
-     * Get the current state for {@code key}.
-     * If no state is held then {@code initialState.get()}
-     * is called to create the initial state for {@code key}.
-     * @param key Partition key.
-     * @return State for {@code key}.
-     */
-    protected synchronized S getState(K key) {
-        S state = states.get(key);
-        if (state == null)
-            states.put(key, state = initialState.get());
-        return state;
-    }
-    
-    /**
-     * Set the current state for {@code key}.
-     * @param key Partition key.
-     * @param state State for {@code key}
-     * @return Previous state for {@code key}, will be null if no state was held.
-     */
-    protected synchronized S setState(K key, S state) {
-        return states.put(key, state);
-    }
-    /**
-     * 
-     * @param key Partition key.
-     * @return Removed state for {@code key}, will be null if no state was held.
-     */
-    protected synchronized S removeState(K key) {
-        return states.remove(key);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Policies.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Policies.java b/api/window/src/main/java/edgent/window/Policies.java
deleted file mode 100644
index cf71950..0000000
--- a/api/window/src/main/java/edgent/window/Policies.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-
-/**
- * Common window policies.
- *
- */
-public class Policies {
-    
-    /**
-     * A policy which schedules a future partition eviction if the partition is empty.
-     * This can be used as a contents policy that is scheduling the eviction of
-     * the tuple just about to be inserted.
-     * @param <T> Tuple Type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @param time The time span in which tuple are permitted in the partition.
-     * @param unit The units of time.
-     * @return The time-based contents policy.
-     */
-    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictIfEmpty(long time, TimeUnit unit){
-        return (partition, tuple) -> {          
-            if(partition.getContents().isEmpty()){
-                ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
-                ses.schedule(() -> partition.evict(), time, unit);
-            }
-        };
-    }
-    
-    /**
-     * A policy which schedules a future partition eviction on the first insert.
-     * This can be used as a contents policy that schedules the eviction of tuples
-     * as a batch.
-     * @param <T> Tuple Type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @param time The time span in which tuple are permitted in the partition.
-     * @param unit The units of time.
-     * @return The time-based contents policy.
-     */
-    @SuppressWarnings("serial")
-    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictOnFirstInsert(long time, TimeUnit unit){
-        
-        // Can't use lambda since state is required
-        return new BiConsumer<Partition<T,K,L>, T>() {
-            private Set<Partition<T,K,L>> initialized_partitions = Collections.synchronizedSet(new HashSet<>());
-            @Override
-            public void accept(Partition<T, K, L> partition, T tuple) {
-                if(!initialized_partitions.contains(partition)){
-                    initialized_partitions.add(partition);
-                    ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
-                    ses.schedule(() -> partition.evict(), time, unit);
-                }    
-            }
-        };
-    }
-    
-    /**
-     * An eviction policy which evicts all tuples that are older than a specified time.
-     * If any tuples remain in the partition, it schedules their eviction after
-     * an appropriate interval.
-     * @param <T> Tuple Type
-     * @param <K> Key type
-     * @param time The timespan in which tuple are permitted in the partition.
-     * @param unit The units of time.
-     * @return The time-based eviction policy.
-     */ 
-    public static <T, K> Consumer<Partition<T, K, InsertionTimeList<T>> > evictOlderWithProcess(long time, TimeUnit unit){
-        
-        long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
-
-        return (partition) -> {
-            ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
-            InsertionTimeList<T> tuples = partition.getContents();
-            long evictTime = System.currentTimeMillis() - timeMs;
-            
-            tuples.evictOlderThan(evictTime);
-
-            partition.process();
-            
-            if(!tuples.isEmpty()){
-                ses.schedule(() -> partition.evict(), tuples.nextEvictDelay(timeMs), TimeUnit.MILLISECONDS);
-            }
-        };
-    }
-    
-    /**
-     * An eviction policy which processes the window, evicts all tuples, and 
-     * schedules the next eviction after the appropriate interval.
-     * @param <T> Tuple Type
-     * @param <K> Key type
-     * @param time The timespan in which tuple are permitted in the partition.
-     * @param unit The units of time.
-     * @return The time-based eviction policy.
-     */ 
-    public static <T, K> Consumer<Partition<T, K, List<T>> > evictAllAndScheduleEvictWithProcess(long time, TimeUnit unit){
-        
-        long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
-        return (partition) -> {
-            ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
-            List<T> tuples = partition.getContents(); 
-
-            partition.process();
-            tuples.clear();
-                        
-            ses.schedule(() -> partition.evict(), timeMs, TimeUnit.MILLISECONDS);       
-        };
-    }
-    
-    
-    /**
-     * Returns an insertion policy that indicates the tuple
-     * is to be inserted into the partition.
-     * 
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * 
-     * @return An insertion policy that always inserts.
-     */
-    public static <T, K, L extends List<T>> BiFunction<Partition<T, K, L>, T, Boolean> alwaysInsert(){
-        return (partition, tuple) -> true;
-    }
-    
-    /**
-     * Returns a count-based contents policy.
-     * If, when called, the number of tuples in the partition is
-     * greater than equal to {@code count} then {@code partition.evict()}
-     * is called.
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @param count the count
-     * @return A count-based contents policy.
-     */
-    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> countContentsPolicy(final int count){
-        return (partition, tuple) -> {
-            if (partition.getContents().size() >= count)
-                partition.evict();
-        };
-    }
-    
-    /**
-     * Returns a Consumer representing an evict determiner that evict all tuples
-     * from the window.
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @return An evict determiner that evicts all tuples.
-     */
-    public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictAll(){
-        return partition -> partition.getContents().clear();
-    }
-    
-    /**
-     * Returns an evict determiner that evicts the oldest tuple.
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @return A evict determiner that evicts the oldest tuple.
-     */
-    public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictOldest(){
-        return partition -> partition.getContents().remove(0);
-    }
-    
-    /**
-     * Returns a trigger policy that triggers
-     * processing on every insert.
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @return A trigger policy that triggers processing on every insert.
-     */ 
-    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processOnInsert(){
-        return (partition, tuple) -> partition.process();
-    }
-    
-    /**
-     * Returns a trigger policy that triggers when the size of a partition
-     * equals or exceeds a value, and then evicts its contents.
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @param size partition size
-     * @return A trigger policy that triggers processing when the size of 
-     * the partition equals or exceets a value.
-     */ 
-    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processWhenFullAndEvict(final int size){
-        return (partition, tuple) -> {
-            if(partition.getContents().size() >= size){
-                partition.process();
-                partition.evict();
-            }
-        };
-    }
-    
-    /**
-     * A {@link BiConsumer} policy which does nothing.
-     * @param <T> Tuple type
-     * @param <K> Key type
-     * @param <L> List type for the partition contents.
-     * @return A policy which does nothing.
-     */
-    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> doNothing(){
-        return (partition, key) -> {};
-    }
-    
-    public static <T> Supplier<InsertionTimeList<T>> insertionTimeList() {
-        return () -> new InsertionTimeList<>();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Window.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Window.java b/api/window/src/main/java/edgent/window/Window.java
deleted file mode 100644
index 23ad605..0000000
--- a/api/window/src/main/java/edgent/window/Window.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-
-
-/**
- * Partitioned window of tuples.
- * Conceptually a window maintains a continuously
- * changing subset of tuples on a stream, such as the last ten tuples
- * or tuples that have arrived in the last five minutes.
- * <P>
- * {@code Window} is partitioned by keys obtained
- * from tuples using a key function. Each tuple is
- * inserted into a partition containing all tuples
- * with the same key (using {@code equals()}).
- * Each partition independently maintains the subset of
- * tuples defined by the windows policies.
- * <BR>
- * An unpartitioned is created by using a key function that
- * returns a constant, to force all tuples to be inserted
- * into a single partition. A convenience function
- * {@link edgent.function.Functions#unpartitioned() unpartitioned()} is
- * provided that returns zero as the fixed key.
- * </P>   
- * <P>
- * The window's policies are flexible to allow any definition of
- * what tuples each partition will contain, and how the
- * partition is processed.
- * </P>
- * @param <T> type of tuples in the window
- * @param <K> type of the window's key
- * @param <L> type of the list used to contain tuples.
- */
-public interface Window<T, K, L extends List<T>>{
-    
-    /**
-     * Attempts to insert the tuple into its partition.
-     * Tuple insertion performs the following actions in order:
-     * <OL>
-     * <LI>Call {@code K key = getKeyFunction().apply(tuple)} to obtain the partition key.</LI>
-     * <LI>Get the partition for {@code key} creating an empty partition if one did not exist for {@code key}. </LI>
-     * <LI>Apply the insertion policy, calling {@code getInsertionPolicy().apply(partition, tuple)}. If it returns false then return false from this method,
-     * otherwise continue.</LI>
-     * <LI>Apply the contents policy, calling {@code getContentsPolicy().apply(partition, tuple)}.
-     * This is a pre-insertion action that allows any action. Some policies may request room to be made for
-     * the insertion by calling {@link Partition#evict() evict()} which will result in a call to the evict determiner.</LI>
-     * <LI>Add {@code tuple} to the contents of the partition.</LI>
-     * <LI>Apply the trigger policy, calling {@code getTriggerPolicy().apply(partition, tuple)}.
-     * This is a post-insertion that action allows any action. A typical implementation is to call
-     * {@link Partition#process() partition.process()} to perform processing of the window.
-     * </OL>
-     * 
-     * 
-     * 
-     * @param tuple the tuple to insert
-     * @return true, if the tuple was successfully inserted. Otherwise, false.
-     */
-    boolean insert(T tuple);
-    
-    /**
-     * Register a WindowProcessor.
-     * @param windowProcessor function to process the window
-     */
-    void registerPartitionProcessor(BiConsumer<List<T>, K> windowProcessor);
-    
-    /**
-     * Register a ScheduledExecutorService.
-     * @param ses the service
-     */
-    void registerScheduledExecutorService(ScheduledExecutorService ses);
-    
-    /**
-     * Returns the insertion policy of the window.
-     *  is called
-     * 
-     * @return The insertion policy.
-     */
-    BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy();
-    
-    /**
-     * Returns the contents policy of the window.
-     * The contents policy is invoked before a tuple
-     * is inserted into a partition.
-     * 
-     * @return contents policy for this window.
-     */
-    BiConsumer<Partition<T, K, L>, T> getContentsPolicy();
-
-    /**
-     * Returns the window's trigger policy.
-     * The trigger policy is invoked (triggered) by
-     * the insertion of a tuple into a partition.
-     * 
-     * @return trigger policy for this window.
-     */
-    BiConsumer<Partition<T, K, L>, T> getTriggerPolicy();
-
-    /**
-     * Returns the partition processor associated with the window.
-     * @return partitionProcessor
-     */
-    BiConsumer<List<T>, K> getPartitionProcessor();
-    
-    /**
-     * Returns the ScheduledExecutorService associated with the window.
-     * @return ScheduledExecutorService
-     */
-    ScheduledExecutorService getScheduledExecutorService();
-    
-    /**
-     * Returns the window's eviction determiner.
-     * The evict determiner is responsible for
-     * determining which tuples in a window need
-     * to be evicted.
-     * <BR>
-     * Calls to {@link Partition#evict()} result in
-     * {@code getEvictDeterminer().accept(partition)} being
-     * called.
-     * In some cases this may not result in tuples being
-     * evicted from the partition.
-     * <P>
-     * An evict determiner evicts tuples from the partition
-     * by removing them from the list returned by
-     * {@link Partition#getContents()}.
-     * 
-     * @return evict determiner for this window.
-     */
-    Consumer<Partition<T, K, L>> getEvictDeterminer();
-    
-    /**
-     * Returns the keyFunction of the window
-     * @return The window's keyFunction.
-     */
-    Function<T, K> getKeyFunction();
-
-    /**
-     * Retrieves the partitions in the window. The map of partitions
-     * is stable when synchronizing on the intrinsic lock of the map,
-     * for example:
-     * <br>
-     * <pre>{@code
-     * Map<K, Partitions<U, K, ?>> partitions = window.getPartitions();
-     * synchronized(partitions){
-     *  // operations with partition
-     * }
-     * }</pre>
-     * 
-     * @return A map of the window's keys and partitions.
-     */
-    Map<K, Partition<T, K, L>> getPartitions();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/WindowImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/WindowImpl.java b/api/window/src/main/java/edgent/window/WindowImpl.java
deleted file mode 100644
index 18f1d51..0000000
--- a/api/window/src/main/java/edgent/window/WindowImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.function.Supplier;
-
-
-class WindowImpl<T, K, L extends List<T>> implements Window<T, K, L> {
-    private final BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy;
-    private final BiConsumer<Partition<T, K, L>, T> contentsPolicy;
-    private final Consumer<Partition<T, K, L> > evictDeterminer;
-    private final BiConsumer<Partition<T, K, L>, T> triggerPolicy;
-    private BiConsumer<List<T>, K> partitionProcessor;
-    
-    private ScheduledExecutorService ses;
-    
-    protected Supplier<L> listSupplier;
-    protected Function<T, K> keyFunction;
-    
-    private Map<K, Partition<T, K, L> > partitions = new HashMap<K, Partition<T, K, L> >();
-    
-    
-    WindowImpl(BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy, BiConsumer<Partition<T, K, L>, T> contentsPolicy,
-            Consumer<Partition<T, K, L> > evictDeterminer, BiConsumer<Partition<T, K, L>, T> triggerPolicy,
-            Function<T, K> keyFunction, Supplier<L> listSupplier){
-        this.insertionPolicy = insertionPolicy;
-        this.contentsPolicy = contentsPolicy;
-        this.evictDeterminer = evictDeterminer;
-        this.triggerPolicy = triggerPolicy;
-        this.keyFunction = keyFunction;
-        this.listSupplier = listSupplier;
-    }
-
-    @Override
-    public boolean insert(T tuple) {
-        K key = keyFunction.apply(tuple);
-        Partition<T, K, L> partition;
-        
-        synchronized (partitions) {
-            partition = partitions.get(key);
-            if (partition == null) {
-                partition = new PartitionImpl<T, K, L>(this, listSupplier.get(), key);
-                partitions.put(key, partition);
-            }
-        }
-        
-        return partition.insert(tuple);      
-    }
-
-   
-    @Override
-    public synchronized void registerPartitionProcessor(BiConsumer<List<T>, K> partitionProcessor){
-            this.partitionProcessor = partitionProcessor;
-    }
-
-    @Override
-    public BiConsumer<Partition<T, K, L>, T> getContentsPolicy() {
-        return contentsPolicy;
-    }
-
-    @Override
-    public BiConsumer<Partition<T, K, L>, T> getTriggerPolicy() {
-        return triggerPolicy;
-    }
-
-    @Override
-    public synchronized BiConsumer<List<T>, K> getPartitionProcessor() {
-            return partitionProcessor;    
-    }
-
-    @Override
-    public BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy() {
-        return insertionPolicy;
-    }
-
-    @Override
-    public Consumer<Partition<T, K, L> > getEvictDeterminer() {
-        return evictDeterminer;
-    }
-
-    @Override
-    public Function<T, K> getKeyFunction() {
-        return keyFunction;
-    }
-
-    @Override
-    public synchronized void registerScheduledExecutorService(ScheduledExecutorService ses) {
-        this.ses = ses;
-        
-    }
-
-    @Override
-    public synchronized ScheduledExecutorService getScheduledExecutorService() {
-        return this.ses;
-    }
-
-    @Override
-    public Map<K, Partition<T, K, L>> getPartitions() {
-        return partitions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Windows.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Windows.java b/api/window/src/main/java/edgent/window/Windows.java
deleted file mode 100644
index 7fdf326..0000000
--- a/api/window/src/main/java/edgent/window/Windows.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import static edgent.window.Policies.alwaysInsert;
-import static edgent.window.Policies.countContentsPolicy;
-import static edgent.window.Policies.evictOldest;
-import static edgent.window.Policies.processOnInsert;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.function.Supplier;
-
-/**
- * Factory to create {@code Window} implementations.
- *
- */
-public class Windows {  
-    
-    /**
-     * Create a window using the passed in policies.
-     *
-     * @param <T> Tuple type.
-     * @param <K> Key type.
-     * @param <L> List type.
-     *
-     * @param insertionPolicy Policy indicating if a tuple should be inserted
-     * into the window.
-     * @param contentsPolicy Contents policy called prior to insertion of a tuple.
-     * @param evictDeterminer Policy that determines action to take when
-     * {@link Partition#evict()} is called.
-     * @param triggerPolicy Trigger policy that is invoked after the insertion
-     * of a tuple into a partition.
-     * @param keyFunction Function that gets the partition key from a tuple.
-     * @param listSupplier Supplier function for the {@code List} that holds
-     * tuples within a partition.
-     * @return Window using the passed in policies.
-     */
-    public static  <T, K, L extends List<T>> Window<T, K, L> window(
-            BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy,
-            BiConsumer<Partition<T, K, L>, T> contentsPolicy,
-            Consumer<Partition<T, K, L> > evictDeterminer,
-            BiConsumer<Partition<T, K, L>, T> triggerPolicy,
-            Function<T, K> keyFunction,
-            Supplier<L> listSupplier){
-        
-        return new WindowImpl<>(insertionPolicy, contentsPolicy, evictDeterminer, triggerPolicy, keyFunction, listSupplier);
-    }
-    
-    /**
-     * Return a window that maintains the last {@code count} tuples inserted
-     * with processing triggered on every insert. This provides 
-     * a continuous processing, where processing is invoked every
-     * time the window changes. Since insertion drives eviction
-     * there is no need to process on eviction, thus once the window
-     * has reached {@code count} tuples, each insertion results in an
-     * eviction followed by processing of {@code count} tuples
-     * including the tuple just inserted, which is the definition of
-     * the window.
-     * 
-     * @param <T> Tuple type.
-     * @param <K> Key type.
-     * 
-     * @param count Number of tuple to maintain per partition
-     * @param keyFunction Tuple partitioning key function
-     * @return window that maintains the last {@code count} tuples on a stream
-     */
-    public static <T, K> Window<T, K, LinkedList<T>> lastNProcessOnInsert(final int count,
-            Function<T, K> keyFunction) {
-
-        Window<T, K, LinkedList<T>> window = Windows.window(
-                alwaysInsert(),
-                countContentsPolicy(count), 
-                evictOldest(), 
-                processOnInsert(), 
-                keyFunction, 
-                () -> new LinkedList<T>());
-
-        return window;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/package-info.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/package-info.java b/api/window/src/main/java/edgent/window/package-info.java
deleted file mode 100644
index f31a8d3..0000000
--- a/api/window/src/main/java/edgent/window/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Window API.
- */
-package edgent.window;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java b/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java
new file mode 100644
index 0000000..2f6d37a
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java
@@ -0,0 +1,134 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+import java.util.AbstractSequentialList;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+/**
+ * A window contents list that maintains insertion time.
+ *
+ * @param <T> Type of tuples in the list
+ */
+public class InsertionTimeList<T> extends AbstractSequentialList<T> {
+    
+    private final LinkedList<T> tuples = new LinkedList<>();
+    private final LinkedList<Long> times = new LinkedList<>();
+    
+    void evictOlderThan(long evictTime) {
+        while(!times.isEmpty() && times.getFirst() <= evictTime){
+            remove();
+        }
+    }
+    
+    long nextEvictDelay(long timeMs) {
+        long firstTupleTime = times.get(0);
+        long nextEvictTime = firstTupleTime + timeMs;
+        
+        long timeToNextEvict = nextEvictTime - System.currentTimeMillis();
+        
+        return Math.max(0, timeToNextEvict);
+    }
+    
+    @Override
+    public ListIterator<T> listIterator(int index) {
+         return new TimedListIterator<>(tuples.listIterator(index), times.listIterator(index));
+    }
+    
+    @Override
+    public boolean add(T tuple) {
+        tuples.add(tuple);
+        times.add(System.currentTimeMillis());
+        return true;
+    }
+    @Override
+    public void clear() {
+        tuples.clear();
+        times.clear();
+    }
+    
+    private void remove() {
+        tuples.remove();
+        times.remove();
+    }
+
+    @Override
+    public int size() {
+         return tuples.size();
+    }
+    
+    private static class TimedListIterator<T> implements ListIterator<T> {
+        
+        private final ListIterator<T> ti;
+        private final ListIterator<Long> iti;
+        
+        TimedListIterator(ListIterator<T> ti, ListIterator<Long> iti) {
+            this.ti = ti;
+            this.iti = iti;
+        }     
+
+        @Override
+        public void add(T tuple) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return ti.hasNext();
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return ti.hasPrevious();
+        }
+
+        @Override
+        public T next() {
+            iti.next();
+            return ti.next();
+        }
+
+        @Override
+        public int nextIndex() {
+            return ti.nextIndex();
+        }
+
+        @Override
+        public T previous() {
+            iti.previous();
+            return ti.previous();
+        }
+
+        @Override
+        public int previousIndex() {
+            return ti.previousIndex();
+        }
+
+        @Override
+        public void remove() {
+            ti.remove();
+            iti.remove();            
+        }
+
+        @Override
+        public void set(T arg0) {
+            throw new UnsupportedOperationException();
+        }       
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Partition.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Partition.java b/api/window/src/main/java/org/apache/edgent/window/Partition.java
new file mode 100644
index 0000000..1efc646
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Partition.java
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A partition within a {@code Window}. The contents of the list
+ * returned by {@code getContents} is stable when synchronizing 
+ * on the partition object. For example:
+ * 
+ * <pre>{@code
+ * Partition<Integer, Integer, ArrayList<Integer>> part = ...;
+ * synchronized(part){
+ *  List<Integer> = part.getContents();
+ *  // stable operation on contents of partition
+ * }
+ * }</pre>
+ *
+ * @param <T> Type of tuples in the partition.
+ * @param <K> Type of the partition's key.
+ * @param <L> Type of the list holding the partition's tuples.
+ * 
+ * @see Window
+ */
+public interface Partition<T, K, L extends List<T>> extends Serializable{    
+    /**
+     * Offers a tuple to be inserted into the partition.
+     * @param tuple Tuple to be offered.
+     * @return True if the tuple was inserted into this partition, false if it was rejected.
+     */
+    boolean insert(T tuple);
+        
+    /**
+     * Invoke the WindowProcessor's processWindow method. A partition processor
+     * must be registered prior to invoking process().
+     */
+    void process();
+    
+    /** 
+     * Calls the partition's evictDeterminer.
+     */
+    void evict();
+    
+    /**
+     * Retrieves the window contents.
+     * @return list of partition contents
+     */
+    L getContents();
+    
+    /**
+     * Return the window in which this partition is contained.
+     * @return the partition's window
+     */
+    Window<T, K, L> getWindow();
+    
+    /**
+     * Returns the key associated with this partition
+     * @return The key of the partition.
+     */
+    K getKey();
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java b/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java
new file mode 100644
index 0000000..c58410b
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+
+@SuppressWarnings("serial")
+class PartitionImpl<T, K, L extends List<T>> implements Partition<T, K, L> {
+    private final L tuples;
+    private final List<T> unmodifiableTuples;
+    private final Window<T, K, L> window;
+    private final K key;
+    
+    PartitionImpl(Window<T, K, L> window, L tuples, K key){
+        this.window = window;
+        this.tuples = tuples;
+        this.unmodifiableTuples = Collections.unmodifiableList(tuples);
+        this.key = key;
+    }
+
+    @Override
+    public synchronized boolean insert(T tuple) {        
+        
+        if (getWindow().getInsertionPolicy().apply(this, tuple)) {
+            getWindow().getContentsPolicy().accept(this, tuple);
+            this.tuples.add(tuple);
+            // Trigger
+            getWindow().getTriggerPolicy().accept(this, tuple);
+            return true;
+        }
+
+        return true;
+    }
+    
+    @Override
+    public synchronized void process() {
+        window.getPartitionProcessor().accept(unmodifiableTuples, key);
+    }
+
+    @Override
+    public synchronized L getContents() {
+        return tuples;
+    }
+
+    @Override
+    public Window<T, K, L> getWindow() {
+        return window;
+    }
+    
+    @Override
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public synchronized void evict() {
+        Consumer<Partition<T, K, L>> evictDeterminer = window.getEvictDeterminer();
+        evictDeterminer.accept(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java b/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java
new file mode 100644
index 0000000..2fd1b43
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Maintain partitioned state.
+ * Abstract class that can be used to maintain state 
+ * for each keyed partition in a {@link Window}.
+ *
+ * @param <K> Key type.
+ * @param <S> State type.
+ */
+public abstract class PartitionedState<K, S> {
+
+    private final Supplier<S> initialState;
+    private final Map<K, S> states = new HashMap<>();
+
+    /**
+     * Construct with an initial state function.
+     * @param initialState Function used to create the initial state for a key.
+     * 
+     * @see #getState(Object)
+     */
+    protected PartitionedState(Supplier<S> initialState) {
+        this.initialState = initialState;
+    }
+
+    /**
+     * Get the current state for {@code key}.
+     * If no state is held then {@code initialState.get()}
+     * is called to create the initial state for {@code key}.
+     * @param key Partition key.
+     * @return State for {@code key}.
+     */
+    protected synchronized S getState(K key) {
+        S state = states.get(key);
+        if (state == null)
+            states.put(key, state = initialState.get());
+        return state;
+    }
+    
+    /**
+     * Set the current state for {@code key}.
+     * @param key Partition key.
+     * @param state State for {@code key}
+     * @return Previous state for {@code key}, will be null if no state was held.
+     */
+    protected synchronized S setState(K key, S state) {
+        return states.put(key, state);
+    }
+    /**
+     * 
+     * @param key Partition key.
+     * @return Removed state for {@code key}, will be null if no state was held.
+     */
+    protected synchronized S removeState(K key) {
+        return states.remove(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Policies.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Policies.java b/api/window/src/main/java/org/apache/edgent/window/Policies.java
new file mode 100644
index 0000000..924ad32
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Policies.java
@@ -0,0 +1,240 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Common window policies.
+ *
+ */
+public class Policies {
+    
+    /**
+     * A policy which schedules a future partition eviction if the partition is empty.
+     * This can be used as a contents policy that is scheduling the eviction of
+     * the tuple just about to be inserted.
+     * @param <T> Tuple Type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @param time The time span in which tuple are permitted in the partition.
+     * @param unit The units of time.
+     * @return The time-based contents policy.
+     */
+    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictIfEmpty(long time, TimeUnit unit){
+        return (partition, tuple) -> {          
+            if(partition.getContents().isEmpty()){
+                ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+                ses.schedule(() -> partition.evict(), time, unit);
+            }
+        };
+    }
+    
+    /**
+     * A policy which schedules a future partition eviction on the first insert.
+     * This can be used as a contents policy that schedules the eviction of tuples
+     * as a batch.
+     * @param <T> Tuple Type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @param time The time span in which tuple are permitted in the partition.
+     * @param unit The units of time.
+     * @return The time-based contents policy.
+     */
+    @SuppressWarnings("serial")
+    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictOnFirstInsert(long time, TimeUnit unit){
+        
+        // Can't use lambda since state is required
+        return new BiConsumer<Partition<T,K,L>, T>() {
+            private Set<Partition<T,K,L>> initialized_partitions = Collections.synchronizedSet(new HashSet<>());
+            @Override
+            public void accept(Partition<T, K, L> partition, T tuple) {
+                if(!initialized_partitions.contains(partition)){
+                    initialized_partitions.add(partition);
+                    ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+                    ses.schedule(() -> partition.evict(), time, unit);
+                }    
+            }
+        };
+    }
+    
+    /**
+     * An eviction policy which evicts all tuples that are older than a specified time.
+     * If any tuples remain in the partition, it schedules their eviction after
+     * an appropriate interval.
+     * @param <T> Tuple Type
+     * @param <K> Key type
+     * @param time The timespan in which tuple are permitted in the partition.
+     * @param unit The units of time.
+     * @return The time-based eviction policy.
+     */ 
+    public static <T, K> Consumer<Partition<T, K, InsertionTimeList<T>> > evictOlderWithProcess(long time, TimeUnit unit){
+        
+        long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
+
+        return (partition) -> {
+            ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+            InsertionTimeList<T> tuples = partition.getContents();
+            long evictTime = System.currentTimeMillis() - timeMs;
+            
+            tuples.evictOlderThan(evictTime);
+
+            partition.process();
+            
+            if(!tuples.isEmpty()){
+                ses.schedule(() -> partition.evict(), tuples.nextEvictDelay(timeMs), TimeUnit.MILLISECONDS);
+            }
+        };
+    }
+    
+    /**
+     * An eviction policy which processes the window, evicts all tuples, and 
+     * schedules the next eviction after the appropriate interval.
+     * @param <T> Tuple Type
+     * @param <K> Key type
+     * @param time The timespan in which tuple are permitted in the partition.
+     * @param unit The units of time.
+     * @return The time-based eviction policy.
+     */ 
+    public static <T, K> Consumer<Partition<T, K, List<T>> > evictAllAndScheduleEvictWithProcess(long time, TimeUnit unit){
+        
+        long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
+        return (partition) -> {
+            ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+            List<T> tuples = partition.getContents(); 
+
+            partition.process();
+            tuples.clear();
+                        
+            ses.schedule(() -> partition.evict(), timeMs, TimeUnit.MILLISECONDS);       
+        };
+    }
+    
+    
+    /**
+     * Returns an insertion policy that indicates the tuple
+     * is to be inserted into the partition.
+     * 
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * 
+     * @return An insertion policy that always inserts.
+     */
+    public static <T, K, L extends List<T>> BiFunction<Partition<T, K, L>, T, Boolean> alwaysInsert(){
+        return (partition, tuple) -> true;
+    }
+    
+    /**
+     * Returns a count-based contents policy.
+     * If, when called, the number of tuples in the partition is
+     * greater than equal to {@code count} then {@code partition.evict()}
+     * is called.
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @param count the count
+     * @return A count-based contents policy.
+     */
+    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> countContentsPolicy(final int count){
+        return (partition, tuple) -> {
+            if (partition.getContents().size() >= count)
+                partition.evict();
+        };
+    }
+    
+    /**
+     * Returns a Consumer representing an evict determiner that evict all tuples
+     * from the window.
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @return An evict determiner that evicts all tuples.
+     */
+    public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictAll(){
+        return partition -> partition.getContents().clear();
+    }
+    
+    /**
+     * Returns an evict determiner that evicts the oldest tuple.
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @return A evict determiner that evicts the oldest tuple.
+     */
+    public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictOldest(){
+        return partition -> partition.getContents().remove(0);
+    }
+    
+    /**
+     * Returns a trigger policy that triggers
+     * processing on every insert.
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @return A trigger policy that triggers processing on every insert.
+     */ 
+    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processOnInsert(){
+        return (partition, tuple) -> partition.process();
+    }
+    
+    /**
+     * Returns a trigger policy that triggers when the size of a partition
+     * equals or exceeds a value, and then evicts its contents.
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @param size partition size
+     * @return A trigger policy that triggers processing when the size of 
+     * the partition equals or exceets a value.
+     */ 
+    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processWhenFullAndEvict(final int size){
+        return (partition, tuple) -> {
+            if(partition.getContents().size() >= size){
+                partition.process();
+                partition.evict();
+            }
+        };
+    }
+    
+    /**
+     * A {@link BiConsumer} policy which does nothing.
+     * @param <T> Tuple type
+     * @param <K> Key type
+     * @param <L> List type for the partition contents.
+     * @return A policy which does nothing.
+     */
+    public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> doNothing(){
+        return (partition, key) -> {};
+    }
+    
+    public static <T> Supplier<InsertionTimeList<T>> insertionTimeList() {
+        return () -> new InsertionTimeList<>();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Window.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Window.java b/api/window/src/main/java/org/apache/edgent/window/Window.java
new file mode 100644
index 0000000..93f6eea
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Window.java
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+
+
+/**
+ * Partitioned window of tuples.
+ * Conceptually a window maintains a continuously
+ * changing subset of tuples on a stream, such as the last ten tuples
+ * or tuples that have arrived in the last five minutes.
+ * <P>
+ * {@code Window} is partitioned by keys obtained
+ * from tuples using a key function. Each tuple is
+ * inserted into a partition containing all tuples
+ * with the same key (using {@code equals()}).
+ * Each partition independently maintains the subset of
+ * tuples defined by the windows policies.
+ * <BR>
+ * An unpartitioned is created by using a key function that
+ * returns a constant, to force all tuples to be inserted
+ * into a single partition. A convenience function
+ * {@link org.apache.edgent.function.Functions#unpartitioned() unpartitioned()} is
+ * provided that returns zero as the fixed key.
+ * </P>   
+ * <P>
+ * The window's policies are flexible to allow any definition of
+ * what tuples each partition will contain, and how the
+ * partition is processed.
+ * </P>
+ * @param <T> type of tuples in the window
+ * @param <K> type of the window's key
+ * @param <L> type of the list used to contain tuples.
+ */
+public interface Window<T, K, L extends List<T>>{
+    
+    /**
+     * Attempts to insert the tuple into its partition.
+     * Tuple insertion performs the following actions in order:
+     * <OL>
+     * <LI>Call {@code K key = getKeyFunction().apply(tuple)} to obtain the partition key.</LI>
+     * <LI>Get the partition for {@code key} creating an empty partition if one did not exist for {@code key}. </LI>
+     * <LI>Apply the insertion policy, calling {@code getInsertionPolicy().apply(partition, tuple)}. If it returns false then return false from this method,
+     * otherwise continue.</LI>
+     * <LI>Apply the contents policy, calling {@code getContentsPolicy().apply(partition, tuple)}.
+     * This is a pre-insertion action that allows any action. Some policies may request room to be made for
+     * the insertion by calling {@link Partition#evict() evict()} which will result in a call to the evict determiner.</LI>
+     * <LI>Add {@code tuple} to the contents of the partition.</LI>
+     * <LI>Apply the trigger policy, calling {@code getTriggerPolicy().apply(partition, tuple)}.
+     * This is a post-insertion that action allows any action. A typical implementation is to call
+     * {@link Partition#process() partition.process()} to perform processing of the window.
+     * </OL>
+     * 
+     * 
+     * 
+     * @param tuple the tuple to insert
+     * @return true, if the tuple was successfully inserted. Otherwise, false.
+     */
+    boolean insert(T tuple);
+    
+    /**
+     * Register a WindowProcessor.
+     * @param windowProcessor function to process the window
+     */
+    void registerPartitionProcessor(BiConsumer<List<T>, K> windowProcessor);
+    
+    /**
+     * Register a ScheduledExecutorService.
+     * @param ses the service
+     */
+    void registerScheduledExecutorService(ScheduledExecutorService ses);
+    
+    /**
+     * Returns the insertion policy of the window.
+     *  is called
+     * 
+     * @return The insertion policy.
+     */
+    BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy();
+    
+    /**
+     * Returns the contents policy of the window.
+     * The contents policy is invoked before a tuple
+     * is inserted into a partition.
+     * 
+     * @return contents policy for this window.
+     */
+    BiConsumer<Partition<T, K, L>, T> getContentsPolicy();
+
+    /**
+     * Returns the window's trigger policy.
+     * The trigger policy is invoked (triggered) by
+     * the insertion of a tuple into a partition.
+     * 
+     * @return trigger policy for this window.
+     */
+    BiConsumer<Partition<T, K, L>, T> getTriggerPolicy();
+
+    /**
+     * Returns the partition processor associated with the window.
+     * @return partitionProcessor
+     */
+    BiConsumer<List<T>, K> getPartitionProcessor();
+    
+    /**
+     * Returns the ScheduledExecutorService associated with the window.
+     * @return ScheduledExecutorService
+     */
+    ScheduledExecutorService getScheduledExecutorService();
+    
+    /**
+     * Returns the window's eviction determiner.
+     * The evict determiner is responsible for
+     * determining which tuples in a window need
+     * to be evicted.
+     * <BR>
+     * Calls to {@link Partition#evict()} result in
+     * {@code getEvictDeterminer().accept(partition)} being
+     * called.
+     * In some cases this may not result in tuples being
+     * evicted from the partition.
+     * <P>
+     * An evict determiner evicts tuples from the partition
+     * by removing them from the list returned by
+     * {@link Partition#getContents()}.
+     * 
+     * @return evict determiner for this window.
+     */
+    Consumer<Partition<T, K, L>> getEvictDeterminer();
+    
+    /**
+     * Returns the keyFunction of the window
+     * @return The window's keyFunction.
+     */
+    Function<T, K> getKeyFunction();
+
+    /**
+     * Retrieves the partitions in the window. The map of partitions
+     * is stable when synchronizing on the intrinsic lock of the map,
+     * for example:
+     * <br>
+     * <pre>{@code
+     * Map<K, Partitions<U, K, ?>> partitions = window.getPartitions();
+     * synchronized(partitions){
+     *  // operations with partition
+     * }
+     * }</pre>
+     * 
+     * @return A map of the window's keys and partitions.
+     */
+    Map<K, Partition<T, K, L>> getPartitions();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java b/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java
new file mode 100644
index 0000000..6651952
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java
@@ -0,0 +1,127 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+
+
+class WindowImpl<T, K, L extends List<T>> implements Window<T, K, L> {
+    private final BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy;
+    private final BiConsumer<Partition<T, K, L>, T> contentsPolicy;
+    private final Consumer<Partition<T, K, L> > evictDeterminer;
+    private final BiConsumer<Partition<T, K, L>, T> triggerPolicy;
+    private BiConsumer<List<T>, K> partitionProcessor;
+    
+    private ScheduledExecutorService ses;
+    
+    protected Supplier<L> listSupplier;
+    protected Function<T, K> keyFunction;
+    
+    private Map<K, Partition<T, K, L> > partitions = new HashMap<K, Partition<T, K, L> >();
+    
+    
+    WindowImpl(BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy, BiConsumer<Partition<T, K, L>, T> contentsPolicy,
+            Consumer<Partition<T, K, L> > evictDeterminer, BiConsumer<Partition<T, K, L>, T> triggerPolicy,
+            Function<T, K> keyFunction, Supplier<L> listSupplier){
+        this.insertionPolicy = insertionPolicy;
+        this.contentsPolicy = contentsPolicy;
+        this.evictDeterminer = evictDeterminer;
+        this.triggerPolicy = triggerPolicy;
+        this.keyFunction = keyFunction;
+        this.listSupplier = listSupplier;
+    }
+
+    @Override
+    public boolean insert(T tuple) {
+        K key = keyFunction.apply(tuple);
+        Partition<T, K, L> partition;
+        
+        synchronized (partitions) {
+            partition = partitions.get(key);
+            if (partition == null) {
+                partition = new PartitionImpl<T, K, L>(this, listSupplier.get(), key);
+                partitions.put(key, partition);
+            }
+        }
+        
+        return partition.insert(tuple);      
+    }
+
+   
+    @Override
+    public synchronized void registerPartitionProcessor(BiConsumer<List<T>, K> partitionProcessor){
+            this.partitionProcessor = partitionProcessor;
+    }
+
+    @Override
+    public BiConsumer<Partition<T, K, L>, T> getContentsPolicy() {
+        return contentsPolicy;
+    }
+
+    @Override
+    public BiConsumer<Partition<T, K, L>, T> getTriggerPolicy() {
+        return triggerPolicy;
+    }
+
+    @Override
+    public synchronized BiConsumer<List<T>, K> getPartitionProcessor() {
+            return partitionProcessor;    
+    }
+
+    @Override
+    public BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy() {
+        return insertionPolicy;
+    }
+
+    @Override
+    public Consumer<Partition<T, K, L> > getEvictDeterminer() {
+        return evictDeterminer;
+    }
+
+    @Override
+    public Function<T, K> getKeyFunction() {
+        return keyFunction;
+    }
+
+    @Override
+    public synchronized void registerScheduledExecutorService(ScheduledExecutorService ses) {
+        this.ses = ses;
+        
+    }
+
+    @Override
+    public synchronized ScheduledExecutorService getScheduledExecutorService() {
+        return this.ses;
+    }
+
+    @Override
+    public Map<K, Partition<T, K, L>> getPartitions() {
+        return partitions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Windows.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Windows.java b/api/window/src/main/java/org/apache/edgent/window/Windows.java
new file mode 100644
index 0000000..da0a217
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Windows.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import static org.apache.edgent.window.Policies.alwaysInsert;
+import static org.apache.edgent.window.Policies.countContentsPolicy;
+import static org.apache.edgent.window.Policies.evictOldest;
+import static org.apache.edgent.window.Policies.processOnInsert;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Factory to create {@code Window} implementations.
+ *
+ */
+public class Windows {  
+    
+    /**
+     * Create a window using the passed in policies.
+     *
+     * @param <T> Tuple type.
+     * @param <K> Key type.
+     * @param <L> List type.
+     *
+     * @param insertionPolicy Policy indicating if a tuple should be inserted
+     * into the window.
+     * @param contentsPolicy Contents policy called prior to insertion of a tuple.
+     * @param evictDeterminer Policy that determines action to take when
+     * {@link Partition#evict()} is called.
+     * @param triggerPolicy Trigger policy that is invoked after the insertion
+     * of a tuple into a partition.
+     * @param keyFunction Function that gets the partition key from a tuple.
+     * @param listSupplier Supplier function for the {@code List} that holds
+     * tuples within a partition.
+     * @return Window using the passed in policies.
+     */
+    public static  <T, K, L extends List<T>> Window<T, K, L> window(
+            BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy,
+            BiConsumer<Partition<T, K, L>, T> contentsPolicy,
+            Consumer<Partition<T, K, L> > evictDeterminer,
+            BiConsumer<Partition<T, K, L>, T> triggerPolicy,
+            Function<T, K> keyFunction,
+            Supplier<L> listSupplier){
+        
+        return new WindowImpl<>(insertionPolicy, contentsPolicy, evictDeterminer, triggerPolicy, keyFunction, listSupplier);
+    }
+    
+    /**
+     * Return a window that maintains the last {@code count} tuples inserted
+     * with processing triggered on every insert. This provides 
+     * a continuous processing, where processing is invoked every
+     * time the window changes. Since insertion drives eviction
+     * there is no need to process on eviction, thus once the window
+     * has reached {@code count} tuples, each insertion results in an
+     * eviction followed by processing of {@code count} tuples
+     * including the tuple just inserted, which is the definition of
+     * the window.
+     * 
+     * @param <T> Tuple type.
+     * @param <K> Key type.
+     * 
+     * @param count Number of tuple to maintain per partition
+     * @param keyFunction Tuple partitioning key function
+     * @return window that maintains the last {@code count} tuples on a stream
+     */
+    public static <T, K> Window<T, K, LinkedList<T>> lastNProcessOnInsert(final int count,
+            Function<T, K> keyFunction) {
+
+        Window<T, K, LinkedList<T>> window = Windows.window(
+                alwaysInsert(),
+                countContentsPolicy(count), 
+                evictOldest(), 
+                processOnInsert(), 
+                keyFunction, 
+                () -> new LinkedList<T>());
+
+        return window;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/package-info.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/package-info.java b/api/window/src/main/java/org/apache/edgent/window/package-info.java
new file mode 100644
index 0000000..20ee62e
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Window API.
+ */
+package org.apache.edgent.window;
+