You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/21 13:17:37 UTC
[36/54] [abbrv] [partial] incubator-quarks git commit: add
"org.apache." prefix to edgent package names
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java
new file mode 100644
index 0000000..59560e9
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/TopologyTestSetup.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.TopologyProvider;
+
+public interface TopologyTestSetup {
+
+ TopologyProvider createTopologyProvider();
+
+ TopologyProvider getTopologyProvider();
+
+ Submitter<Topology, Job> createSubmitter();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java b/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java
new file mode 100644
index 0000000..0053328
--- /dev/null
+++ b/api/topology/src/test/java/org/apache/edgent/test/topology/services/TestApplications.java
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.topology.services;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.services.TopologyBuilder;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Some dummy test applications that will be part of this
+ * jar including having service provider entries.
+ * Used for testing the application service.
+ *
+ */
+public class TestApplications {
+
+ private static abstract class App implements TopologyBuilder {
+
+ @Override
+ public BiConsumer<Topology, JsonObject> getBuilder() {
+ return (t,c) -> t.strings(getName()).print();
+ }
+ }
+
+ public static class AppOne extends App {
+ @Override
+ public String getName() {
+ return "FirstJarApp";
+ }
+ }
+ public static class AppTwo extends App {
+ @Override
+ public String getName() {
+ return "SecondJarApp";
+ }
+ }
+ public static class AppThree extends App {
+ @Override
+ public String getName() {
+ return "ThirdJarApp";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/InsertionTimeList.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/InsertionTimeList.java b/api/window/src/main/java/edgent/window/InsertionTimeList.java
deleted file mode 100644
index f1a0d84..0000000
--- a/api/window/src/main/java/edgent/window/InsertionTimeList.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-import java.util.AbstractSequentialList;
-import java.util.LinkedList;
-import java.util.ListIterator;
-
-/**
- * A window contents list that maintains insertion time.
- *
- * @param <T> Type of tuples in the list
- */
-public class InsertionTimeList<T> extends AbstractSequentialList<T> {
-
- private final LinkedList<T> tuples = new LinkedList<>();
- private final LinkedList<Long> times = new LinkedList<>();
-
- void evictOlderThan(long evictTime) {
- while(!times.isEmpty() && times.getFirst() <= evictTime){
- remove();
- }
- }
-
- long nextEvictDelay(long timeMs) {
- long firstTupleTime = times.get(0);
- long nextEvictTime = firstTupleTime + timeMs;
-
- long timeToNextEvict = nextEvictTime - System.currentTimeMillis();
-
- return Math.max(0, timeToNextEvict);
- }
-
- @Override
- public ListIterator<T> listIterator(int index) {
- return new TimedListIterator<>(tuples.listIterator(index), times.listIterator(index));
- }
-
- @Override
- public boolean add(T tuple) {
- tuples.add(tuple);
- times.add(System.currentTimeMillis());
- return true;
- }
- @Override
- public void clear() {
- tuples.clear();
- times.clear();
- }
-
- private void remove() {
- tuples.remove();
- times.remove();
- }
-
- @Override
- public int size() {
- return tuples.size();
- }
-
- private static class TimedListIterator<T> implements ListIterator<T> {
-
- private final ListIterator<T> ti;
- private final ListIterator<Long> iti;
-
- TimedListIterator(ListIterator<T> ti, ListIterator<Long> iti) {
- this.ti = ti;
- this.iti = iti;
- }
-
- @Override
- public void add(T tuple) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean hasNext() {
- return ti.hasNext();
- }
-
- @Override
- public boolean hasPrevious() {
- return ti.hasPrevious();
- }
-
- @Override
- public T next() {
- iti.next();
- return ti.next();
- }
-
- @Override
- public int nextIndex() {
- return ti.nextIndex();
- }
-
- @Override
- public T previous() {
- iti.previous();
- return ti.previous();
- }
-
- @Override
- public int previousIndex() {
- return ti.previousIndex();
- }
-
- @Override
- public void remove() {
- ti.remove();
- iti.remove();
- }
-
- @Override
- public void set(T arg0) {
- throw new UnsupportedOperationException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Partition.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Partition.java b/api/window/src/main/java/edgent/window/Partition.java
deleted file mode 100644
index ec70672..0000000
--- a/api/window/src/main/java/edgent/window/Partition.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * A partition within a {@code Window}. The contents of the list
- * returned by {@code getContents} is stable when synchronizing
- * on the partition object. For example:
- *
- * <pre>{@code
- * Partition<Integer, Integer, ArrayList<Integer>> part = ...;
- * synchronized(part){
- * List<Integer> = part.getContents();
- * // stable operation on contents of partition
- * }
- * }</pre>
- *
- * @param <T> Type of tuples in the partition.
- * @param <K> Type of the partition's key.
- * @param <L> Type of the list holding the partition's tuples.
- *
- * @see Window
- */
-public interface Partition<T, K, L extends List<T>> extends Serializable{
- /**
- * Offers a tuple to be inserted into the partition.
- * @param tuple Tuple to be offered.
- * @return True if the tuple was inserted into this partition, false if it was rejected.
- */
- boolean insert(T tuple);
-
- /**
- * Invoke the WindowProcessor's processWindow method. A partition processor
- * must be registered prior to invoking process().
- */
- void process();
-
- /**
- * Calls the partition's evictDeterminer.
- */
- void evict();
-
- /**
- * Retrieves the window contents.
- * @return list of partition contents
- */
- L getContents();
-
- /**
- * Return the window in which this partition is contained.
- * @return the partition's window
- */
- Window<T, K, L> getWindow();
-
- /**
- * Returns the key associated with this partition
- * @return The key of the partition.
- */
- K getKey();
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/PartitionImpl.java b/api/window/src/main/java/edgent/window/PartitionImpl.java
deleted file mode 100644
index a535035..0000000
--- a/api/window/src/main/java/edgent/window/PartitionImpl.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.Collections;
-import java.util.List;
-
-import edgent.function.Consumer;
-
-@SuppressWarnings("serial")
-class PartitionImpl<T, K, L extends List<T>> implements Partition<T, K, L> {
- private final L tuples;
- private final List<T> unmodifiableTuples;
- private final Window<T, K, L> window;
- private final K key;
-
- PartitionImpl(Window<T, K, L> window, L tuples, K key){
- this.window = window;
- this.tuples = tuples;
- this.unmodifiableTuples = Collections.unmodifiableList(tuples);
- this.key = key;
- }
-
- @Override
- public synchronized boolean insert(T tuple) {
-
- if (getWindow().getInsertionPolicy().apply(this, tuple)) {
- getWindow().getContentsPolicy().accept(this, tuple);
- this.tuples.add(tuple);
- // Trigger
- getWindow().getTriggerPolicy().accept(this, tuple);
- return true;
- }
-
- return true;
- }
-
- @Override
- public synchronized void process() {
- window.getPartitionProcessor().accept(unmodifiableTuples, key);
- }
-
- @Override
- public synchronized L getContents() {
- return tuples;
- }
-
- @Override
- public Window<T, K, L> getWindow() {
- return window;
- }
-
- @Override
- public K getKey() {
- return key;
- }
-
- @Override
- public synchronized void evict() {
- Consumer<Partition<T, K, L>> evictDeterminer = window.getEvictDeterminer();
- evictDeterminer.accept(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/PartitionedState.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/PartitionedState.java b/api/window/src/main/java/edgent/window/PartitionedState.java
deleted file mode 100644
index efc1054..0000000
--- a/api/window/src/main/java/edgent/window/PartitionedState.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edgent.function.Supplier;
-
-/**
- * Maintain partitioned state.
- * Abstract class that can be used to maintain state
- * for each keyed partition in a {@link Window}.
- *
- * @param <K> Key type.
- * @param <S> State type.
- */
-public abstract class PartitionedState<K, S> {
-
- private final Supplier<S> initialState;
- private final Map<K, S> states = new HashMap<>();
-
- /**
- * Construct with an initial state function.
- * @param initialState Function used to create the initial state for a key.
- *
- * @see #getState(Object)
- */
- protected PartitionedState(Supplier<S> initialState) {
- this.initialState = initialState;
- }
-
- /**
- * Get the current state for {@code key}.
- * If no state is held then {@code initialState.get()}
- * is called to create the initial state for {@code key}.
- * @param key Partition key.
- * @return State for {@code key}.
- */
- protected synchronized S getState(K key) {
- S state = states.get(key);
- if (state == null)
- states.put(key, state = initialState.get());
- return state;
- }
-
- /**
- * Set the current state for {@code key}.
- * @param key Partition key.
- * @param state State for {@code key}
- * @return Previous state for {@code key}, will be null if no state was held.
- */
- protected synchronized S setState(K key, S state) {
- return states.put(key, state);
- }
- /**
- *
- * @param key Partition key.
- * @return Removed state for {@code key}, will be null if no state was held.
- */
- protected synchronized S removeState(K key) {
- return states.remove(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Policies.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Policies.java b/api/window/src/main/java/edgent/window/Policies.java
deleted file mode 100644
index cf71950..0000000
--- a/api/window/src/main/java/edgent/window/Policies.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Supplier;
-
-/**
- * Common window policies.
- *
- */
-public class Policies {
-
- /**
- * A policy which schedules a future partition eviction if the partition is empty.
- * This can be used as a contents policy that is scheduling the eviction of
- * the tuple just about to be inserted.
- * @param <T> Tuple Type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @param time The time span in which tuple are permitted in the partition.
- * @param unit The units of time.
- * @return The time-based contents policy.
- */
- public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictIfEmpty(long time, TimeUnit unit){
- return (partition, tuple) -> {
- if(partition.getContents().isEmpty()){
- ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
- ses.schedule(() -> partition.evict(), time, unit);
- }
- };
- }
-
- /**
- * A policy which schedules a future partition eviction on the first insert.
- * This can be used as a contents policy that schedules the eviction of tuples
- * as a batch.
- * @param <T> Tuple Type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @param time The time span in which tuple are permitted in the partition.
- * @param unit The units of time.
- * @return The time-based contents policy.
- */
- @SuppressWarnings("serial")
- public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictOnFirstInsert(long time, TimeUnit unit){
-
- // Can't use lambda since state is required
- return new BiConsumer<Partition<T,K,L>, T>() {
- private Set<Partition<T,K,L>> initialized_partitions = Collections.synchronizedSet(new HashSet<>());
- @Override
- public void accept(Partition<T, K, L> partition, T tuple) {
- if(!initialized_partitions.contains(partition)){
- initialized_partitions.add(partition);
- ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
- ses.schedule(() -> partition.evict(), time, unit);
- }
- }
- };
- }
-
- /**
- * An eviction policy which evicts all tuples that are older than a specified time.
- * If any tuples remain in the partition, it schedules their eviction after
- * an appropriate interval.
- * @param <T> Tuple Type
- * @param <K> Key type
- * @param time The timespan in which tuple are permitted in the partition.
- * @param unit The units of time.
- * @return The time-based eviction policy.
- */
- public static <T, K> Consumer<Partition<T, K, InsertionTimeList<T>> > evictOlderWithProcess(long time, TimeUnit unit){
-
- long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
-
- return (partition) -> {
- ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
- InsertionTimeList<T> tuples = partition.getContents();
- long evictTime = System.currentTimeMillis() - timeMs;
-
- tuples.evictOlderThan(evictTime);
-
- partition.process();
-
- if(!tuples.isEmpty()){
- ses.schedule(() -> partition.evict(), tuples.nextEvictDelay(timeMs), TimeUnit.MILLISECONDS);
- }
- };
- }
-
- /**
- * An eviction policy which processes the window, evicts all tuples, and
- * schedules the next eviction after the appropriate interval.
- * @param <T> Tuple Type
- * @param <K> Key type
- * @param time The timespan in which tuple are permitted in the partition.
- * @param unit The units of time.
- * @return The time-based eviction policy.
- */
- public static <T, K> Consumer<Partition<T, K, List<T>> > evictAllAndScheduleEvictWithProcess(long time, TimeUnit unit){
-
- long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
- return (partition) -> {
- ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
- List<T> tuples = partition.getContents();
-
- partition.process();
- tuples.clear();
-
- ses.schedule(() -> partition.evict(), timeMs, TimeUnit.MILLISECONDS);
- };
- }
-
-
- /**
- * Returns an insertion policy that indicates the tuple
- * is to be inserted into the partition.
- *
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- *
- * @return An insertion policy that always inserts.
- */
- public static <T, K, L extends List<T>> BiFunction<Partition<T, K, L>, T, Boolean> alwaysInsert(){
- return (partition, tuple) -> true;
- }
-
- /**
- * Returns a count-based contents policy.
- * If, when called, the number of tuples in the partition is
- * greater than equal to {@code count} then {@code partition.evict()}
- * is called.
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @param count the count
- * @return A count-based contents policy.
- */
- public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> countContentsPolicy(final int count){
- return (partition, tuple) -> {
- if (partition.getContents().size() >= count)
- partition.evict();
- };
- }
-
- /**
- * Returns a Consumer representing an evict determiner that evict all tuples
- * from the window.
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @return An evict determiner that evicts all tuples.
- */
- public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictAll(){
- return partition -> partition.getContents().clear();
- }
-
- /**
- * Returns an evict determiner that evicts the oldest tuple.
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @return A evict determiner that evicts the oldest tuple.
- */
- public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictOldest(){
- return partition -> partition.getContents().remove(0);
- }
-
- /**
- * Returns a trigger policy that triggers
- * processing on every insert.
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @return A trigger policy that triggers processing on every insert.
- */
- public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processOnInsert(){
- return (partition, tuple) -> partition.process();
- }
-
- /**
- * Returns a trigger policy that triggers when the size of a partition
- * equals or exceeds a value, and then evicts its contents.
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @param size partition size
- * @return A trigger policy that triggers processing when the size of
- * the partition equals or exceets a value.
- */
- public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processWhenFullAndEvict(final int size){
- return (partition, tuple) -> {
- if(partition.getContents().size() >= size){
- partition.process();
- partition.evict();
- }
- };
- }
-
- /**
- * A {@link BiConsumer} policy which does nothing.
- * @param <T> Tuple type
- * @param <K> Key type
- * @param <L> List type for the partition contents.
- * @return A policy which does nothing.
- */
- public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> doNothing(){
- return (partition, key) -> {};
- }
-
- public static <T> Supplier<InsertionTimeList<T>> insertionTimeList() {
- return () -> new InsertionTimeList<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Window.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Window.java b/api/window/src/main/java/edgent/window/Window.java
deleted file mode 100644
index 23ad605..0000000
--- a/api/window/src/main/java/edgent/window/Window.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-
-
-/**
- * Partitioned window of tuples.
- * Conceptually a window maintains a continuously
- * changing subset of tuples on a stream, such as the last ten tuples
- * or tuples that have arrived in the last five minutes.
- * <P>
- * {@code Window} is partitioned by keys obtained
- * from tuples using a key function. Each tuple is
- * inserted into a partition containing all tuples
- * with the same key (using {@code equals()}).
- * Each partition independently maintains the subset of
- * tuples defined by the windows policies.
- * <BR>
- * An unpartitioned is created by using a key function that
- * returns a constant, to force all tuples to be inserted
- * into a single partition. A convenience function
- * {@link edgent.function.Functions#unpartitioned() unpartitioned()} is
- * provided that returns zero as the fixed key.
- * </P>
- * <P>
- * The window's policies are flexible to allow any definition of
- * what tuples each partition will contain, and how the
- * partition is processed.
- * </P>
- * @param <T> type of tuples in the window
- * @param <K> type of the window's key
- * @param <L> type of the list used to contain tuples.
- */
-public interface Window<T, K, L extends List<T>>{
-
- /**
- * Attempts to insert the tuple into its partition.
- * Tuple insertion performs the following actions in order:
- * <OL>
- * <LI>Call {@code K key = getKeyFunction().apply(tuple)} to obtain the partition key.</LI>
- * <LI>Get the partition for {@code key} creating an empty partition if one did not exist for {@code key}. </LI>
- * <LI>Apply the insertion policy, calling {@code getInsertionPolicy().apply(partition, tuple)}. If it returns false then return false from this method,
- * otherwise continue.</LI>
- * <LI>Apply the contents policy, calling {@code getContentsPolicy().apply(partition, tuple)}.
- * This is a pre-insertion action that allows any action. Some policies may request room to be made for
- * the insertion by calling {@link Partition#evict() evict()} which will result in a call to the evict determiner.</LI>
- * <LI>Add {@code tuple} to the contents of the partition.</LI>
- * <LI>Apply the trigger policy, calling {@code getTriggerPolicy().apply(partition, tuple)}.
- * This is a post-insertion that action allows any action. A typical implementation is to call
- * {@link Partition#process() partition.process()} to perform processing of the window.
- * </OL>
- *
- *
- *
- * @param tuple the tuple to insert
- * @return true, if the tuple was successfully inserted. Otherwise, false.
- */
- boolean insert(T tuple);
-
- /**
- * Register a WindowProcessor.
- * @param windowProcessor function to process the window
- */
- void registerPartitionProcessor(BiConsumer<List<T>, K> windowProcessor);
-
- /**
- * Register a ScheduledExecutorService.
- * @param ses the service
- */
- void registerScheduledExecutorService(ScheduledExecutorService ses);
-
- /**
- * Returns the insertion policy of the window.
- * is called
- *
- * @return The insertion policy.
- */
- BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy();
-
- /**
- * Returns the contents policy of the window.
- * The contents policy is invoked before a tuple
- * is inserted into a partition.
- *
- * @return contents policy for this window.
- */
- BiConsumer<Partition<T, K, L>, T> getContentsPolicy();
-
- /**
- * Returns the window's trigger policy.
- * The trigger policy is invoked (triggered) by
- * the insertion of a tuple into a partition.
- *
- * @return trigger policy for this window.
- */
- BiConsumer<Partition<T, K, L>, T> getTriggerPolicy();
-
- /**
- * Returns the partition processor associated with the window.
- * @return partitionProcessor
- */
- BiConsumer<List<T>, K> getPartitionProcessor();
-
- /**
- * Returns the ScheduledExecutorService associated with the window.
- * @return ScheduledExecutorService
- */
- ScheduledExecutorService getScheduledExecutorService();
-
- /**
- * Returns the window's eviction determiner.
- * The evict determiner is responsible for
- * determining which tuples in a window need
- * to be evicted.
- * <BR>
- * Calls to {@link Partition#evict()} result in
- * {@code getEvictDeterminer().accept(partition)} being
- * called.
- * In some cases this may not result in tuples being
- * evicted from the partition.
- * <P>
- * An evict determiner evicts tuples from the partition
- * by removing them from the list returned by
- * {@link Partition#getContents()}.
- *
- * @return evict determiner for this window.
- */
- Consumer<Partition<T, K, L>> getEvictDeterminer();
-
- /**
- * Returns the keyFunction of the window
- * @return The window's keyFunction.
- */
- Function<T, K> getKeyFunction();
-
- /**
- * Retrieves the partitions in the window. The map of partitions
- * is stable when synchronizing on the intrinsic lock of the map,
- * for example:
- * <br>
- * <pre>{@code
- * Map<K, Partitions<U, K, ?>> partitions = window.getPartitions();
- * synchronized(partitions){
- * // operations with partition
- * }
- * }</pre>
- *
- * @return A map of the window's keys and partitions.
- */
- Map<K, Partition<T, K, L>> getPartitions();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/WindowImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/WindowImpl.java b/api/window/src/main/java/edgent/window/WindowImpl.java
deleted file mode 100644
index 18f1d51..0000000
--- a/api/window/src/main/java/edgent/window/WindowImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.function.Supplier;
-
-
-class WindowImpl<T, K, L extends List<T>> implements Window<T, K, L> {
- private final BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy;
- private final BiConsumer<Partition<T, K, L>, T> contentsPolicy;
- private final Consumer<Partition<T, K, L> > evictDeterminer;
- private final BiConsumer<Partition<T, K, L>, T> triggerPolicy;
- private BiConsumer<List<T>, K> partitionProcessor;
-
- private ScheduledExecutorService ses;
-
- protected Supplier<L> listSupplier;
- protected Function<T, K> keyFunction;
-
- private Map<K, Partition<T, K, L> > partitions = new HashMap<K, Partition<T, K, L> >();
-
-
- WindowImpl(BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy, BiConsumer<Partition<T, K, L>, T> contentsPolicy,
- Consumer<Partition<T, K, L> > evictDeterminer, BiConsumer<Partition<T, K, L>, T> triggerPolicy,
- Function<T, K> keyFunction, Supplier<L> listSupplier){
- this.insertionPolicy = insertionPolicy;
- this.contentsPolicy = contentsPolicy;
- this.evictDeterminer = evictDeterminer;
- this.triggerPolicy = triggerPolicy;
- this.keyFunction = keyFunction;
- this.listSupplier = listSupplier;
- }
-
- @Override
- public boolean insert(T tuple) {
- K key = keyFunction.apply(tuple);
- Partition<T, K, L> partition;
-
- synchronized (partitions) {
- partition = partitions.get(key);
- if (partition == null) {
- partition = new PartitionImpl<T, K, L>(this, listSupplier.get(), key);
- partitions.put(key, partition);
- }
- }
-
- return partition.insert(tuple);
- }
-
-
- @Override
- public synchronized void registerPartitionProcessor(BiConsumer<List<T>, K> partitionProcessor){
- this.partitionProcessor = partitionProcessor;
- }
-
- @Override
- public BiConsumer<Partition<T, K, L>, T> getContentsPolicy() {
- return contentsPolicy;
- }
-
- @Override
- public BiConsumer<Partition<T, K, L>, T> getTriggerPolicy() {
- return triggerPolicy;
- }
-
- @Override
- public synchronized BiConsumer<List<T>, K> getPartitionProcessor() {
- return partitionProcessor;
- }
-
- @Override
- public BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy() {
- return insertionPolicy;
- }
-
- @Override
- public Consumer<Partition<T, K, L> > getEvictDeterminer() {
- return evictDeterminer;
- }
-
- @Override
- public Function<T, K> getKeyFunction() {
- return keyFunction;
- }
-
- @Override
- public synchronized void registerScheduledExecutorService(ScheduledExecutorService ses) {
- this.ses = ses;
-
- }
-
- @Override
- public synchronized ScheduledExecutorService getScheduledExecutorService() {
- return this.ses;
- }
-
- @Override
- public Map<K, Partition<T, K, L>> getPartitions() {
- return partitions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/Windows.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/Windows.java b/api/window/src/main/java/edgent/window/Windows.java
deleted file mode 100644
index 7fdf326..0000000
--- a/api/window/src/main/java/edgent/window/Windows.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.window;
-
-import static edgent.window.Policies.alwaysInsert;
-import static edgent.window.Policies.countContentsPolicy;
-import static edgent.window.Policies.evictOldest;
-import static edgent.window.Policies.processOnInsert;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import edgent.function.BiConsumer;
-import edgent.function.BiFunction;
-import edgent.function.Consumer;
-import edgent.function.Function;
-import edgent.function.Supplier;
-
-/**
- * Factory to create {@code Window} implementations.
- *
- */
-public class Windows {
-
- /**
- * Create a window using the passed in policies.
- *
- * @param <T> Tuple type.
- * @param <K> Key type.
- * @param <L> List type.
- *
- * @param insertionPolicy Policy indicating if a tuple should be inserted
- * into the window.
- * @param contentsPolicy Contents policy called prior to insertion of a tuple.
- * @param evictDeterminer Policy that determines action to take when
- * {@link Partition#evict()} is called.
- * @param triggerPolicy Trigger policy that is invoked after the insertion
- * of a tuple into a partition.
- * @param keyFunction Function that gets the partition key from a tuple.
- * @param listSupplier Supplier function for the {@code List} that holds
- * tuples within a partition.
- * @return Window using the passed in policies.
- */
- public static <T, K, L extends List<T>> Window<T, K, L> window(
- BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy,
- BiConsumer<Partition<T, K, L>, T> contentsPolicy,
- Consumer<Partition<T, K, L> > evictDeterminer,
- BiConsumer<Partition<T, K, L>, T> triggerPolicy,
- Function<T, K> keyFunction,
- Supplier<L> listSupplier){
-
- return new WindowImpl<>(insertionPolicy, contentsPolicy, evictDeterminer, triggerPolicy, keyFunction, listSupplier);
- }
-
- /**
- * Return a window that maintains the last {@code count} tuples inserted
- * with processing triggered on every insert. This provides
- * a continuous processing, where processing is invoked every
- * time the window changes. Since insertion drives eviction
- * there is no need to process on eviction, thus once the window
- * has reached {@code count} tuples, each insertion results in an
- * eviction followed by processing of {@code count} tuples
- * including the tuple just inserted, which is the definition of
- * the window.
- *
- * @param <T> Tuple type.
- * @param <K> Key type.
- *
- * @param count Number of tuple to maintain per partition
- * @param keyFunction Tuple partitioning key function
- * @return window that maintains the last {@code count} tuples on a stream
- */
- public static <T, K> Window<T, K, LinkedList<T>> lastNProcessOnInsert(final int count,
- Function<T, K> keyFunction) {
-
- Window<T, K, LinkedList<T>> window = Windows.window(
- alwaysInsert(),
- countContentsPolicy(count),
- evictOldest(),
- processOnInsert(),
- keyFunction,
- () -> new LinkedList<T>());
-
- return window;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/edgent/window/package-info.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/edgent/window/package-info.java b/api/window/src/main/java/edgent/window/package-info.java
deleted file mode 100644
index f31a8d3..0000000
--- a/api/window/src/main/java/edgent/window/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Window API.
- */
-package edgent.window;
-
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java b/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java
new file mode 100644
index 0000000..2f6d37a
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/InsertionTimeList.java
@@ -0,0 +1,134 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+import java.util.AbstractSequentialList;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+/**
+ * A window contents list that maintains insertion time.
+ *
+ * @param <T> Type of tuples in the list
+ */
+public class InsertionTimeList<T> extends AbstractSequentialList<T> {
+
+ private final LinkedList<T> tuples = new LinkedList<>();
+ private final LinkedList<Long> times = new LinkedList<>();
+
+ void evictOlderThan(long evictTime) {
+ while(!times.isEmpty() && times.getFirst() <= evictTime){
+ remove();
+ }
+ }
+
+ long nextEvictDelay(long timeMs) {
+ long firstTupleTime = times.get(0);
+ long nextEvictTime = firstTupleTime + timeMs;
+
+ long timeToNextEvict = nextEvictTime - System.currentTimeMillis();
+
+ return Math.max(0, timeToNextEvict);
+ }
+
+ @Override
+ public ListIterator<T> listIterator(int index) {
+ return new TimedListIterator<>(tuples.listIterator(index), times.listIterator(index));
+ }
+
+ @Override
+ public boolean add(T tuple) {
+ tuples.add(tuple);
+ times.add(System.currentTimeMillis());
+ return true;
+ }
+ @Override
+ public void clear() {
+ tuples.clear();
+ times.clear();
+ }
+
+ private void remove() {
+ tuples.remove();
+ times.remove();
+ }
+
+ @Override
+ public int size() {
+ return tuples.size();
+ }
+
+ private static class TimedListIterator<T> implements ListIterator<T> {
+
+ private final ListIterator<T> ti;
+ private final ListIterator<Long> iti;
+
+ TimedListIterator(ListIterator<T> ti, ListIterator<Long> iti) {
+ this.ti = ti;
+ this.iti = iti;
+ }
+
+ @Override
+ public void add(T tuple) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return ti.hasNext();
+ }
+
+ @Override
+ public boolean hasPrevious() {
+ return ti.hasPrevious();
+ }
+
+ @Override
+ public T next() {
+ iti.next();
+ return ti.next();
+ }
+
+ @Override
+ public int nextIndex() {
+ return ti.nextIndex();
+ }
+
+ @Override
+ public T previous() {
+ iti.previous();
+ return ti.previous();
+ }
+
+ @Override
+ public int previousIndex() {
+ return ti.previousIndex();
+ }
+
+ @Override
+ public void remove() {
+ ti.remove();
+ iti.remove();
+ }
+
+ @Override
+ public void set(T arg0) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Partition.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Partition.java b/api/window/src/main/java/org/apache/edgent/window/Partition.java
new file mode 100644
index 0000000..1efc646
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Partition.java
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A partition within a {@code Window}. The contents of the list
+ * returned by {@code getContents} is stable when synchronizing
+ * on the partition object. For example:
+ *
+ * <pre>{@code
+ * Partition<Integer, Integer, ArrayList<Integer>> part = ...;
+ * synchronized(part){
+ * List<Integer> = part.getContents();
+ * // stable operation on contents of partition
+ * }
+ * }</pre>
+ *
+ * @param <T> Type of tuples in the partition.
+ * @param <K> Type of the partition's key.
+ * @param <L> Type of the list holding the partition's tuples.
+ *
+ * @see Window
+ */
+public interface Partition<T, K, L extends List<T>> extends Serializable{
+ /**
+ * Offers a tuple to be inserted into the partition.
+ * @param tuple Tuple to be offered.
+ * @return True if the tuple was inserted into this partition, false if it was rejected.
+ */
+ boolean insert(T tuple);
+
+ /**
+ * Invoke the WindowProcessor's processWindow method. A partition processor
+ * must be registered prior to invoking process().
+ */
+ void process();
+
+ /**
+ * Calls the partition's evictDeterminer.
+ */
+ void evict();
+
+ /**
+ * Retrieves the window contents.
+ * @return list of partition contents
+ */
+ L getContents();
+
+ /**
+ * Return the window in which this partition is contained.
+ * @return the partition's window
+ */
+ Window<T, K, L> getWindow();
+
+ /**
+ * Returns the key associated with this partition
+ * @return The key of the partition.
+ */
+ K getKey();
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java b/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java
new file mode 100644
index 0000000..c58410b
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/PartitionImpl.java
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.edgent.function.Consumer;
+
+@SuppressWarnings("serial")
+class PartitionImpl<T, K, L extends List<T>> implements Partition<T, K, L> {
+ private final L tuples;
+ private final List<T> unmodifiableTuples;
+ private final Window<T, K, L> window;
+ private final K key;
+
+ PartitionImpl(Window<T, K, L> window, L tuples, K key){
+ this.window = window;
+ this.tuples = tuples;
+ this.unmodifiableTuples = Collections.unmodifiableList(tuples);
+ this.key = key;
+ }
+
+ @Override
+ public synchronized boolean insert(T tuple) {
+
+ if (getWindow().getInsertionPolicy().apply(this, tuple)) {
+ getWindow().getContentsPolicy().accept(this, tuple);
+ this.tuples.add(tuple);
+ // Trigger
+ getWindow().getTriggerPolicy().accept(this, tuple);
+ return true;
+ }
+
+ return true;
+ }
+
+ @Override
+ public synchronized void process() {
+ window.getPartitionProcessor().accept(unmodifiableTuples, key);
+ }
+
+ @Override
+ public synchronized L getContents() {
+ return tuples;
+ }
+
+ @Override
+ public Window<T, K, L> getWindow() {
+ return window;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public synchronized void evict() {
+ Consumer<Partition<T, K, L>> evictDeterminer = window.getEvictDeterminer();
+ evictDeterminer.accept(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java b/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java
new file mode 100644
index 0000000..2fd1b43
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/PartitionedState.java
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Maintain partitioned state.
+ * Abstract class that can be used to maintain state
+ * for each keyed partition in a {@link Window}.
+ *
+ * @param <K> Key type.
+ * @param <S> State type.
+ */
+public abstract class PartitionedState<K, S> {
+
+ private final Supplier<S> initialState;
+ private final Map<K, S> states = new HashMap<>();
+
+ /**
+ * Construct with an initial state function.
+ * @param initialState Function used to create the initial state for a key.
+ *
+ * @see #getState(Object)
+ */
+ protected PartitionedState(Supplier<S> initialState) {
+ this.initialState = initialState;
+ }
+
+ /**
+ * Get the current state for {@code key}.
+ * If no state is held then {@code initialState.get()}
+ * is called to create the initial state for {@code key}.
+ * @param key Partition key.
+ * @return State for {@code key}.
+ */
+ protected synchronized S getState(K key) {
+ S state = states.get(key);
+ if (state == null)
+ states.put(key, state = initialState.get());
+ return state;
+ }
+
+ /**
+ * Set the current state for {@code key}.
+ * @param key Partition key.
+ * @param state State for {@code key}
+ * @return Previous state for {@code key}, will be null if no state was held.
+ */
+ protected synchronized S setState(K key, S state) {
+ return states.put(key, state);
+ }
+ /**
+ *
+ * @param key Partition key.
+ * @return Removed state for {@code key}, will be null if no state was held.
+ */
+ protected synchronized S removeState(K key) {
+ return states.remove(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Policies.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Policies.java b/api/window/src/main/java/org/apache/edgent/window/Policies.java
new file mode 100644
index 0000000..924ad32
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Policies.java
@@ -0,0 +1,240 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Common window policies.
+ *
+ */
+public class Policies {
+
+ /**
+ * A policy which schedules a future partition eviction if the partition is empty.
+ * This can be used as a contents policy that is scheduling the eviction of
+ * the tuple just about to be inserted.
+ * @param <T> Tuple Type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @param time The time span in which tuple are permitted in the partition.
+ * @param unit The units of time.
+ * @return The time-based contents policy.
+ */
+ public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictIfEmpty(long time, TimeUnit unit){
+ return (partition, tuple) -> {
+ if(partition.getContents().isEmpty()){
+ ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+ ses.schedule(() -> partition.evict(), time, unit);
+ }
+ };
+ }
+
+ /**
+ * A policy which schedules a future partition eviction on the first insert.
+ * This can be used as a contents policy that schedules the eviction of tuples
+ * as a batch.
+ * @param <T> Tuple Type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @param time The time span in which tuple are permitted in the partition.
+ * @param unit The units of time.
+ * @return The time-based contents policy.
+ */
+ @SuppressWarnings("serial")
+ public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> scheduleEvictOnFirstInsert(long time, TimeUnit unit){
+
+ // Can't use lambda since state is required
+ return new BiConsumer<Partition<T,K,L>, T>() {
+ private Set<Partition<T,K,L>> initialized_partitions = Collections.synchronizedSet(new HashSet<>());
+ @Override
+ public void accept(Partition<T, K, L> partition, T tuple) {
+ if(!initialized_partitions.contains(partition)){
+ initialized_partitions.add(partition);
+ ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+ ses.schedule(() -> partition.evict(), time, unit);
+ }
+ }
+ };
+ }
+
+ /**
+ * An eviction policy which evicts all tuples that are older than a specified time.
+ * If any tuples remain in the partition, it schedules their eviction after
+ * an appropriate interval.
+ * @param <T> Tuple Type
+ * @param <K> Key type
+ * @param time The timespan in which tuple are permitted in the partition.
+ * @param unit The units of time.
+ * @return The time-based eviction policy.
+ */
+ public static <T, K> Consumer<Partition<T, K, InsertionTimeList<T>> > evictOlderWithProcess(long time, TimeUnit unit){
+
+ long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
+
+ return (partition) -> {
+ ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+ InsertionTimeList<T> tuples = partition.getContents();
+ long evictTime = System.currentTimeMillis() - timeMs;
+
+ tuples.evictOlderThan(evictTime);
+
+ partition.process();
+
+ if(!tuples.isEmpty()){
+ ses.schedule(() -> partition.evict(), tuples.nextEvictDelay(timeMs), TimeUnit.MILLISECONDS);
+ }
+ };
+ }
+
+ /**
+ * An eviction policy which processes the window, evicts all tuples, and
+ * schedules the next eviction after the appropriate interval.
+ * @param <T> Tuple Type
+ * @param <K> Key type
+ * @param time The timespan in which tuple are permitted in the partition.
+ * @param unit The units of time.
+ * @return The time-based eviction policy.
+ */
+ public static <T, K> Consumer<Partition<T, K, List<T>> > evictAllAndScheduleEvictWithProcess(long time, TimeUnit unit){
+
+ long timeMs = TimeUnit.MILLISECONDS.convert(time, unit);
+ return (partition) -> {
+ ScheduledExecutorService ses = partition.getWindow().getScheduledExecutorService();
+ List<T> tuples = partition.getContents();
+
+ partition.process();
+ tuples.clear();
+
+ ses.schedule(() -> partition.evict(), timeMs, TimeUnit.MILLISECONDS);
+ };
+ }
+
+
+ /**
+ * Returns an insertion policy that indicates the tuple
+ * is to be inserted into the partition.
+ *
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ *
+ * @return An insertion policy that always inserts.
+ */
+ public static <T, K, L extends List<T>> BiFunction<Partition<T, K, L>, T, Boolean> alwaysInsert(){
+ return (partition, tuple) -> true;
+ }
+
+ /**
+ * Returns a count-based contents policy.
+ * If, when called, the number of tuples in the partition is
+ * greater than equal to {@code count} then {@code partition.evict()}
+ * is called.
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @param count the count
+ * @return A count-based contents policy.
+ */
+ public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> countContentsPolicy(final int count){
+ return (partition, tuple) -> {
+ if (partition.getContents().size() >= count)
+ partition.evict();
+ };
+ }
+
+ /**
+ * Returns a Consumer representing an evict determiner that evict all tuples
+ * from the window.
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @return An evict determiner that evicts all tuples.
+ */
+ public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictAll(){
+ return partition -> partition.getContents().clear();
+ }
+
+ /**
+ * Returns an evict determiner that evicts the oldest tuple.
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @return A evict determiner that evicts the oldest tuple.
+ */
+ public static <T, K, L extends List<T>> Consumer<Partition<T, K, L> > evictOldest(){
+ return partition -> partition.getContents().remove(0);
+ }
+
+ /**
+ * Returns a trigger policy that triggers
+ * processing on every insert.
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @return A trigger policy that triggers processing on every insert.
+ */
+ public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processOnInsert(){
+ return (partition, tuple) -> partition.process();
+ }
+
+ /**
+ * Returns a trigger policy that triggers when the size of a partition
+ * equals or exceeds a value, and then evicts its contents.
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @param size partition size
+ * @return A trigger policy that triggers processing when the size of
+ * the partition equals or exceets a value.
+ */
+ public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> processWhenFullAndEvict(final int size){
+ return (partition, tuple) -> {
+ if(partition.getContents().size() >= size){
+ partition.process();
+ partition.evict();
+ }
+ };
+ }
+
+ /**
+ * A {@link BiConsumer} policy which does nothing.
+ * @param <T> Tuple type
+ * @param <K> Key type
+ * @param <L> List type for the partition contents.
+ * @return A policy which does nothing.
+ */
+ public static <T, K, L extends List<T>> BiConsumer<Partition<T, K, L>, T> doNothing(){
+ return (partition, key) -> {};
+ }
+
+ public static <T> Supplier<InsertionTimeList<T>> insertionTimeList() {
+ return () -> new InsertionTimeList<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Window.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Window.java b/api/window/src/main/java/org/apache/edgent/window/Window.java
new file mode 100644
index 0000000..93f6eea
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Window.java
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+
+
+/**
+ * Partitioned window of tuples.
+ * Conceptually a window maintains a continuously
+ * changing subset of tuples on a stream, such as the last ten tuples
+ * or tuples that have arrived in the last five minutes.
+ * <P>
+ * {@code Window} is partitioned by keys obtained
+ * from tuples using a key function. Each tuple is
+ * inserted into a partition containing all tuples
+ * with the same key (using {@code equals()}).
+ * Each partition independently maintains the subset of
+ * tuples defined by the windows policies.
+ * <BR>
+ * An unpartitioned is created by using a key function that
+ * returns a constant, to force all tuples to be inserted
+ * into a single partition. A convenience function
+ * {@link org.apache.edgent.function.Functions#unpartitioned() unpartitioned()} is
+ * provided that returns zero as the fixed key.
+ * </P>
+ * <P>
+ * The window's policies are flexible to allow any definition of
+ * what tuples each partition will contain, and how the
+ * partition is processed.
+ * </P>
+ * @param <T> type of tuples in the window
+ * @param <K> type of the window's key
+ * @param <L> type of the list used to contain tuples.
+ */
+public interface Window<T, K, L extends List<T>>{
+
+ /**
+ * Attempts to insert the tuple into its partition.
+ * Tuple insertion performs the following actions in order:
+ * <OL>
+ * <LI>Call {@code K key = getKeyFunction().apply(tuple)} to obtain the partition key.</LI>
+ * <LI>Get the partition for {@code key} creating an empty partition if one did not exist for {@code key}. </LI>
+ * <LI>Apply the insertion policy, calling {@code getInsertionPolicy().apply(partition, tuple)}. If it returns false then return false from this method,
+ * otherwise continue.</LI>
+ * <LI>Apply the contents policy, calling {@code getContentsPolicy().apply(partition, tuple)}.
+ * This is a pre-insertion action that allows any action. Some policies may request room to be made for
+ * the insertion by calling {@link Partition#evict() evict()} which will result in a call to the evict determiner.</LI>
+ * <LI>Add {@code tuple} to the contents of the partition.</LI>
+ * <LI>Apply the trigger policy, calling {@code getTriggerPolicy().apply(partition, tuple)}.
+ * This is a post-insertion that action allows any action. A typical implementation is to call
+ * {@link Partition#process() partition.process()} to perform processing of the window.
+ * </OL>
+ *
+ *
+ *
+ * @param tuple the tuple to insert
+ * @return true, if the tuple was successfully inserted. Otherwise, false.
+ */
+ boolean insert(T tuple);
+
+ /**
+ * Register a WindowProcessor.
+ * @param windowProcessor function to process the window
+ */
+ void registerPartitionProcessor(BiConsumer<List<T>, K> windowProcessor);
+
+ /**
+ * Register a ScheduledExecutorService.
+ * @param ses the service
+ */
+ void registerScheduledExecutorService(ScheduledExecutorService ses);
+
+ /**
+ * Returns the insertion policy of the window.
+ * is called
+ *
+ * @return The insertion policy.
+ */
+ BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy();
+
+ /**
+ * Returns the contents policy of the window.
+ * The contents policy is invoked before a tuple
+ * is inserted into a partition.
+ *
+ * @return contents policy for this window.
+ */
+ BiConsumer<Partition<T, K, L>, T> getContentsPolicy();
+
+ /**
+ * Returns the window's trigger policy.
+ * The trigger policy is invoked (triggered) by
+ * the insertion of a tuple into a partition.
+ *
+ * @return trigger policy for this window.
+ */
+ BiConsumer<Partition<T, K, L>, T> getTriggerPolicy();
+
+ /**
+ * Returns the partition processor associated with the window.
+ * @return partitionProcessor
+ */
+ BiConsumer<List<T>, K> getPartitionProcessor();
+
+ /**
+ * Returns the ScheduledExecutorService associated with the window.
+ * @return ScheduledExecutorService
+ */
+ ScheduledExecutorService getScheduledExecutorService();
+
+ /**
+ * Returns the window's eviction determiner.
+ * The evict determiner is responsible for
+ * determining which tuples in a window need
+ * to be evicted.
+ * <BR>
+ * Calls to {@link Partition#evict()} result in
+ * {@code getEvictDeterminer().accept(partition)} being
+ * called.
+ * In some cases this may not result in tuples being
+ * evicted from the partition.
+ * <P>
+ * An evict determiner evicts tuples from the partition
+ * by removing them from the list returned by
+ * {@link Partition#getContents()}.
+ *
+ * @return evict determiner for this window.
+ */
+ Consumer<Partition<T, K, L>> getEvictDeterminer();
+
+ /**
+ * Returns the keyFunction of the window
+ * @return The window's keyFunction.
+ */
+ Function<T, K> getKeyFunction();
+
+ /**
+ * Retrieves the partitions in the window. The map of partitions
+ * is stable when synchronizing on the intrinsic lock of the map,
+ * for example:
+ * <br>
+ * <pre>{@code
+ * Map<K, Partitions<U, K, ?>> partitions = window.getPartitions();
+ * synchronized(partitions){
+ * // operations with partition
+ * }
+ * }</pre>
+ *
+ * @return A map of the window's keys and partitions.
+ */
+ Map<K, Partition<T, K, L>> getPartitions();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java b/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java
new file mode 100644
index 0000000..6651952
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/WindowImpl.java
@@ -0,0 +1,127 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+
+
+class WindowImpl<T, K, L extends List<T>> implements Window<T, K, L> {
+ private final BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy;
+ private final BiConsumer<Partition<T, K, L>, T> contentsPolicy;
+ private final Consumer<Partition<T, K, L> > evictDeterminer;
+ private final BiConsumer<Partition<T, K, L>, T> triggerPolicy;
+ private BiConsumer<List<T>, K> partitionProcessor;
+
+ private ScheduledExecutorService ses;
+
+ protected Supplier<L> listSupplier;
+ protected Function<T, K> keyFunction;
+
+ private Map<K, Partition<T, K, L> > partitions = new HashMap<K, Partition<T, K, L> >();
+
+
+ WindowImpl(BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy, BiConsumer<Partition<T, K, L>, T> contentsPolicy,
+ Consumer<Partition<T, K, L> > evictDeterminer, BiConsumer<Partition<T, K, L>, T> triggerPolicy,
+ Function<T, K> keyFunction, Supplier<L> listSupplier){
+ this.insertionPolicy = insertionPolicy;
+ this.contentsPolicy = contentsPolicy;
+ this.evictDeterminer = evictDeterminer;
+ this.triggerPolicy = triggerPolicy;
+ this.keyFunction = keyFunction;
+ this.listSupplier = listSupplier;
+ }
+
+ @Override
+ public boolean insert(T tuple) {
+ K key = keyFunction.apply(tuple);
+ Partition<T, K, L> partition;
+
+ synchronized (partitions) {
+ partition = partitions.get(key);
+ if (partition == null) {
+ partition = new PartitionImpl<T, K, L>(this, listSupplier.get(), key);
+ partitions.put(key, partition);
+ }
+ }
+
+ return partition.insert(tuple);
+ }
+
+
+ @Override
+ public synchronized void registerPartitionProcessor(BiConsumer<List<T>, K> partitionProcessor){
+ this.partitionProcessor = partitionProcessor;
+ }
+
+ @Override
+ public BiConsumer<Partition<T, K, L>, T> getContentsPolicy() {
+ return contentsPolicy;
+ }
+
+ @Override
+ public BiConsumer<Partition<T, K, L>, T> getTriggerPolicy() {
+ return triggerPolicy;
+ }
+
+ @Override
+ public synchronized BiConsumer<List<T>, K> getPartitionProcessor() {
+ return partitionProcessor;
+ }
+
+ @Override
+ public BiFunction<Partition<T, K, L>, T, Boolean> getInsertionPolicy() {
+ return insertionPolicy;
+ }
+
+ @Override
+ public Consumer<Partition<T, K, L> > getEvictDeterminer() {
+ return evictDeterminer;
+ }
+
+ @Override
+ public Function<T, K> getKeyFunction() {
+ return keyFunction;
+ }
+
+ @Override
+ public synchronized void registerScheduledExecutorService(ScheduledExecutorService ses) {
+ this.ses = ses;
+
+ }
+
+ @Override
+ public synchronized ScheduledExecutorService getScheduledExecutorService() {
+ return this.ses;
+ }
+
+ @Override
+ public Map<K, Partition<T, K, L>> getPartitions() {
+ return partitions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/Windows.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/Windows.java b/api/window/src/main/java/org/apache/edgent/window/Windows.java
new file mode 100644
index 0000000..da0a217
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/Windows.java
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.window;
+
+import static org.apache.edgent.window.Policies.alwaysInsert;
+import static org.apache.edgent.window.Policies.countContentsPolicy;
+import static org.apache.edgent.window.Policies.evictOldest;
+import static org.apache.edgent.window.Policies.processOnInsert;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.edgent.function.BiConsumer;
+import org.apache.edgent.function.BiFunction;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+
+/**
+ * Factory to create {@code Window} implementations.
+ *
+ */
+public class Windows {
+
+ /**
+ * Create a window using the passed in policies.
+ *
+ * @param <T> Tuple type.
+ * @param <K> Key type.
+ * @param <L> List type.
+ *
+ * @param insertionPolicy Policy indicating if a tuple should be inserted
+ * into the window.
+ * @param contentsPolicy Contents policy called prior to insertion of a tuple.
+ * @param evictDeterminer Policy that determines action to take when
+ * {@link Partition#evict()} is called.
+ * @param triggerPolicy Trigger policy that is invoked after the insertion
+ * of a tuple into a partition.
+ * @param keyFunction Function that gets the partition key from a tuple.
+ * @param listSupplier Supplier function for the {@code List} that holds
+ * tuples within a partition.
+ * @return Window using the passed in policies.
+ */
+ public static <T, K, L extends List<T>> Window<T, K, L> window(
+ BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy,
+ BiConsumer<Partition<T, K, L>, T> contentsPolicy,
+ Consumer<Partition<T, K, L> > evictDeterminer,
+ BiConsumer<Partition<T, K, L>, T> triggerPolicy,
+ Function<T, K> keyFunction,
+ Supplier<L> listSupplier){
+
+ return new WindowImpl<>(insertionPolicy, contentsPolicy, evictDeterminer, triggerPolicy, keyFunction, listSupplier);
+ }
+
+ /**
+ * Return a window that maintains the last {@code count} tuples inserted
+ * with processing triggered on every insert. This provides
+ * a continuous processing, where processing is invoked every
+ * time the window changes. Since insertion drives eviction
+ * there is no need to process on eviction, thus once the window
+ * has reached {@code count} tuples, each insertion results in an
+ * eviction followed by processing of {@code count} tuples
+ * including the tuple just inserted, which is the definition of
+ * the window.
+ *
+ * @param <T> Tuple type.
+ * @param <K> Key type.
+ *
+ * @param count Number of tuple to maintain per partition
+ * @param keyFunction Tuple partitioning key function
+ * @return window that maintains the last {@code count} tuples on a stream
+ */
+ public static <T, K> Window<T, K, LinkedList<T>> lastNProcessOnInsert(final int count,
+ Function<T, K> keyFunction) {
+
+ Window<T, K, LinkedList<T>> window = Windows.window(
+ alwaysInsert(),
+ countContentsPolicy(count),
+ evictOldest(),
+ processOnInsert(),
+ keyFunction,
+ () -> new LinkedList<T>());
+
+ return window;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/api/window/src/main/java/org/apache/edgent/window/package-info.java
----------------------------------------------------------------------
diff --git a/api/window/src/main/java/org/apache/edgent/window/package-info.java b/api/window/src/main/java/org/apache/edgent/window/package-info.java
new file mode 100644
index 0000000..20ee62e
--- /dev/null
+++ b/api/window/src/main/java/org/apache/edgent/window/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Window API.
+ */
+package org.apache.edgent.window;
+