You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:19 UTC
[14/34] incubator-flink git commit: [streaming] New windowing API
merge and cleanup + several minor fixes
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
deleted file mode 100755
index 0135b34..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
- private static final long serialVersionUID = 1L;
- protected long startTime;
- protected long nextRecordTime;
- protected TimeStamp<OUT> timestamp;
- protected StreamWindow window;
-
- public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
- long slideInterval, TimeStamp<OUT> timestamp) {
- super(reduceFunction, windowSize, slideInterval);
- this.timestamp = timestamp;
- this.startTime = timestamp.getStartTime();
- }
-
- @Override
- public void open(Configuration config) throws Exception {
- super.open(config);
- this.window = new StreamWindow();
- this.batch = this.window;
- if (timestamp instanceof DefaultTimeStamp) {
- (new TimeCheck()).start();
- }
- }
-
- protected class StreamWindow extends StreamBatch {
-
- private static final long serialVersionUID = 1L;
-
- public StreamWindow() {
- super();
-
- }
-
- @Override
- public void reduceToBuffer(OUT nextValue) throws Exception {
-
- checkWindowEnd(timestamp.getTimestamp(nextValue));
-
- if (currentValue != null) {
- currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
- } else {
- currentValue = nextValue;
- }
- }
-
- protected synchronized void checkWindowEnd(long timeStamp) {
- nextRecordTime = timeStamp;
-
- while (miniBatchEnd()) {
- addToBuffer();
- if (batchEnd()) {
- reduceBatch();
- }
- }
- }
-
- @Override
- public void reduceBatch() {
- reduce(this);
- }
-
- @Override
- protected boolean miniBatchEnd() {
- if (nextRecordTime < startTime + granularity) {
- return false;
- } else {
- startTime += granularity;
- return true;
- }
- }
-
- @Override
- public boolean batchEnd() {
- if (minibatchCounter == numberOfBatches) {
- minibatchCounter -= batchPerSlide;
- return true;
- }
- return false;
- }
-
- }
-
- private class TimeCheck extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(slideSize);
- } catch (InterruptedException e) {
- }
- if (isRunning) {
- window.checkWindowEnd(System.currentTimeMillis());
- } else {
- break;
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
new file mode 100644
index 0000000..2752fd1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowingGroupInvokable<IN, OUT> extends WindowingInvokable<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+ GroupReduceFunction<IN, OUT> reducer;
+
+ public WindowingGroupInvokable(GroupReduceFunction<IN, OUT> userFunction,
+ LinkedList<TriggerPolicy<IN>> triggerPolicies,
+ LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+ super(userFunction, triggerPolicies, evictionPolicies);
+ this.reducer = userFunction;
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ reducer.reduce(buffer, collector);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
index 29975f4..d0855e6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
@@ -17,8 +17,12 @@
package org.apache.flink.streaming.api.invokable.operator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
@@ -28,12 +32,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
+public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
/**
* Auto-generated serial version UID
@@ -47,9 +46,8 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
- private LinkedList<IN> buffer = new LinkedList<IN>();
+ protected LinkedList<IN> buffer = new LinkedList<IN>();
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
- private ReduceFunction<IN> reducer;
/**
* This constructor created a windowing invokable using trigger and eviction
@@ -64,12 +62,10 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
* A list of {@link EvictionPolicy}s and/or
* {@link ActiveEvictionPolicy}s
*/
- public WindowingInvokable(ReduceFunction<IN> userFunction,
- LinkedList<TriggerPolicy<IN>> triggerPolicies,
+ public WindowingInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies,
LinkedList<EvictionPolicy<IN>> evictionPolicies) {
super(userFunction);
- this.reducer = userFunction;
this.triggerPolicies = triggerPolicies;
this.evictionPolicies = evictionPolicies;
@@ -369,28 +365,4 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
}
}
- @Override
- protected void callUserFunction() throws Exception {
- Iterator<IN> reducedIterator = buffer.iterator();
- IN reduced = null;
-
- while (reducedIterator.hasNext() && reduced == null) {
- reduced = reducedIterator.next();
- }
-
- while (reducedIterator.hasNext()) {
- IN next = reducedIterator.next();
- if (next != null) {
- reduced = reducer.reduce(reduced, next);
- }
- }
- if (reduced != null) {
- String[] tmp = new String[currentTriggerPolicies.size()];
- for (int i = 0; i < tmp.length; i++) {
- tmp[i] = currentTriggerPolicies.get(i).toString();
- }
- collector.collect(new Tuple2<IN, String[]>(reduced, tmp));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
new file mode 100644
index 0000000..89f98e0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowingReduceInvokable<IN> extends WindowingInvokable<IN, IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ ReduceFunction<IN> reducer;
+
+ public WindowingReduceInvokable(ReduceFunction<IN> userFunction,
+ LinkedList<TriggerPolicy<IN>> triggerPolicies,
+ LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+ super(userFunction, triggerPolicies, evictionPolicies);
+ this.reducer = userFunction;
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ Iterator<IN> reducedIterator = buffer.iterator();
+ IN reduced = null;
+
+ while (reducedIterator.hasNext() && reduced == null) {
+ reduced = reducedIterator.next();
+ }
+
+ while (reducedIterator.hasNext()) {
+ IN next = reducedIterator.next();
+ if (next != null) {
+ reduced = reducer.reduce(reduced, next);
+ }
+ }
+ if (reduced != null) {
+ collector.collect(reduced);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
index b74e952..6e045f6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -23,21 +23,24 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
/**
- * Represents a count based trigger or eviction policy.
- * Use the {@link Count#of(int)} to get an instance.
+ * Represents a count based trigger or eviction policy. Use the
+ * {@link Count#of(int)} to get an instance.
*/
@SuppressWarnings("rawtypes")
public class Count implements WindowingHelper {
private int count;
+ private int deleteOnEviction = 1;
+ private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
/**
* Specifies on which element a trigger or an eviction should happen (based
* on the count of the elements).
*
- * This constructor does exactly the same as {@link Count#of(int)}.
+ * This constructor does exactly the same as {@link Count#of(int)}.
*
- * @param count the number of elements to count before trigger/evict
+ * @param count
+ * the number of elements to count before trigger/evict
*/
public Count(int count) {
this.count = count;
@@ -45,12 +48,22 @@ public class Count implements WindowingHelper {
@Override
public EvictionPolicy<?> toEvict() {
- return new CountEvictionPolicy(count);
+ return new CountEvictionPolicy(count, deleteOnEviction);
}
@Override
public TriggerPolicy<?> toTrigger() {
- return new CountTriggerPolicy(count);
+ return new CountTriggerPolicy(count, startValue);
+ }
+
+ public Count withDelete(int deleteOnEviction) {
+ this.deleteOnEviction = deleteOnEviction;
+ return this;
+ }
+
+ public Count startingAt(int startValue) {
+ this.startValue = startValue;
+ return this;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 37f2902..0b73150 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.windowing.helper;
import java.util.concurrent.TimeUnit;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
/**
@@ -39,7 +40,9 @@ public class Time<DATA> implements WindowingHelper<DATA> {
private int timeVal;
private TimeUnit granularity;
- private Extractor<Long, DATA> timeToData;
+ private Extractor<Long, DATA> longToDATAExtractor;
+ private TimeStamp<DATA> timeStamp;
+ private long delay;
/**
* Creates an helper representing a trigger which triggers every given
@@ -47,50 +50,29 @@ public class Time<DATA> implements WindowingHelper<DATA> {
*
* @param timeVal
* The number of time units
- * @param granularity
+ * @param timeUnit
* The unit of time such as minute oder millisecond. Note that
* the smallest possible granularity is milliseconds. Any smaller
* time unit might cause an error at runtime due to conversion
* problems.
- * @param timeToData
- * This policy creates fake elements to not miss windows in case
- * no element arrived within the duration of the window. This
- * extractor should wrap a long into such an element of type
- * DATA.
*/
- public Time(int timeVal, TimeUnit granularity, Extractor<Long, DATA> timeToData) {
+ private Time(int timeVal, TimeUnit timeUnit) {
this.timeVal = timeVal;
- this.granularity = granularity;
- this.timeToData = timeToData;
- }
-
- /**
- * Creates an helper representing a trigger which triggers every given
- * timeVal or an eviction which evicts all elements older than timeVal.
- *
- * The default granularity for timeVal used in this method is seconds.
- *
- * @param timeVal
- * The number of time units measured in seconds.
- * @param timeToData
- * This policy creates fake elements to not miss windows in case
- * no element arrived within the duration of the window. This
- * extractor should wrap a long into such an element of type
- * DATA.
- */
- public Time(int timeVal, Extractor<Long, DATA> timeToData) {
- this(timeVal, TimeUnit.SECONDS, timeToData);
+ this.granularity = timeUnit;
+ this.longToDATAExtractor = new NullExtractor<DATA>();
+ this.timeStamp = new DefaultTimeStamp<DATA>();
+ this.delay = 0;
}
@Override
public EvictionPolicy<DATA> toEvict() {
- return new TimeEvictionPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>());
+ return new TimeEvictionPolicy<DATA>(granularityInMillis(), timeStamp);
}
@Override
public TriggerPolicy<DATA> toTrigger() {
- return new TimeTriggerPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>(),
- timeToData);
+ return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay,
+ longToDATAExtractor);
}
/**
@@ -104,22 +86,39 @@ public class Time<DATA> implements WindowingHelper<DATA> {
* the smallest possible granularity is milliseconds. Any smaller
* time unit might cause an error at runtime due to conversion
* problems.
- * @param timeToData
- * This policy creates fake elements to not miss windows in case
- * no element arrived within the duration of the window. This
- * extractor should wrap a long into such an element of type
- * DATA.
* @return an helper representing a trigger which triggers every given
* timeVal or an eviction which evicts all elements older than
* timeVal.
*/
- public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity,
- Extractor<Long, DATA> timeToData) {
- return new Time<DATA>(timeVal, granularity, timeToData);
+ public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity) {
+ return new Time<DATA>(timeVal, granularity);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp, Extractor<Long, R> extractor) {
+ this.timeStamp = (TimeStamp<DATA>) timeStamp;
+ this.longToDATAExtractor = (Extractor<Long, DATA>) extractor;
+ return (Time<R>) this;
+ }
+
+ public Time<DATA> withDelay(long delay) {
+ this.delay = delay;
+ return this;
}
private long granularityInMillis() {
return this.granularity.toMillis(this.timeVal);
}
+ public static class NullExtractor<T> implements Extractor<Long, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T extract(Long in) {
+ return null;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index d5ae932..7a3c75a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -30,7 +30,7 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
*/
private static final long serialVersionUID = -6357200688886103968L;
- private static final int DEFAULT_START_VALUE = 0;
+ public static final int DEFAULT_START_VALUE = 0;
private int counter;
private int max;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index d9acc84..b212293 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -87,10 +87,10 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
// delete and count expired tuples
int counter = 0;
+ long threshold = timestamp.getTimestamp(datapoint) - granularity;
while (!buffer.isEmpty()) {
- if (timestamp.getTimestamp(buffer.getFirst()) < timestamp.getTimestamp(datapoint)
- - granularity) {
+ if (timestamp.getTimestamp(buffer.getFirst()) < threshold) {
buffer.removeFirst();
counter++;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index ff93ac9..1dd713f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -41,9 +41,11 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
*/
private static final long serialVersionUID = -5122753802440196719L;
- private long startTime;
- private long granularity;
- private TimeStamp<DATA> timestamp;
+ protected long startTime;
+ protected long granularity;
+ protected TimeStamp<DATA> timestamp;
+ protected long delay;
+
private Extractor<Long, DATA> longToDATAExtractor;
/**
@@ -54,8 +56,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* example, the policy will trigger at every second point in time.
*
* @param granularity
- * The granularity of the trigger. If this value is set to 2 the
- * policy will trigger at every second time point
+ * The granularity of the trigger. If this value is set to x the
+ * policy will trigger at every x-th time point
* @param timestamp
* The {@link TimeStamp} to measure the time with. This can be
* either user defined of provided by the API.
@@ -72,10 +74,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
/**
* This is mostly the same as
- * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
- * to granularity and timestamp a delay can be specified for the first
- * trigger. If the start time given by the timestamp is x, the delay is y,
- * and the granularity is z, the first trigger will happen at x+y+z.
+ * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In
+ * addition to granularity and timestamp a delay can be specified for the
+ * first trigger. If the start time given by the timestamp is x, the delay
+ * is y, and the granularity is z, the first trigger will happen at x+y+z.
*
* @param granularity
* The granularity of the trigger. If this value is set to 2 the
@@ -98,21 +100,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
this.startTime = timestamp.getStartTime() + delay;
this.timestamp = timestamp;
this.granularity = granularity;
+ this.delay = delay;
this.longToDATAExtractor = timeWrapper;
- }
- @Override
- public synchronized boolean notifyTrigger(DATA datapoint) {
- long recordTime = timestamp.getTimestamp(datapoint);
- // start time is included, but end time is excluded: >=
- if (recordTime >= startTime + granularity) {
- if (granularity != 0) {
- startTime = recordTime - ((recordTime - startTime) % granularity);
- }
- return true;
- } else {
- return false;
- }
}
/**
@@ -126,7 +116,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
// check if there is more then one window border missed
// use > here. In case >= would fit, the regular call will do the job.
while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
- fakeElements.add(longToDATAExtractor.extract(startTime += granularity));
+ startTime += granularity;
+ fakeElements.add(longToDATAExtractor.extract(startTime));
}
return (DATA[]) fakeElements.toArray();
}
@@ -162,7 +153,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
if (System.currentTimeMillis() >= startTime + granularity) {
startTime += granularity;
- callback.sendFakeElement(longToDATAExtractor.extract(startTime += granularity));
+ callback.sendFakeElement(longToDATAExtractor.extract(startTime));
}
}
@@ -192,8 +183,22 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
}
@Override
+ public synchronized boolean notifyTrigger(DATA datapoint) {
+ long recordTime = timestamp.getTimestamp(datapoint);
+ // start time is included, but end time is excluded: >=
+ if (recordTime >= startTime + granularity) {
+ if (granularity != 0) {
+ startTime = recordTime - ((recordTime - startTime) % granularity);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
public TimeTriggerPolicy<DATA> clone() {
- return new TimeTriggerPolicy<DATA>(granularity, timestamp, 0, longToDATAExtractor);
+ return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay, longToDATAExtractor);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index d65436c..c494d5f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -17,13 +17,25 @@
package org.apache.flink.streaming.api;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
import org.junit.Test;
-public class PrintTest{
+public class PrintTest implements Serializable {
private static final long MEMORYSIZE = 32;
@@ -44,12 +56,52 @@ public class PrintTest{
return true;
}
}
-
+
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
- env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
+
+ List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String, Integer>>();
+
+ env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)
+ .window(Time.of(2, TimeUnit.MILLISECONDS).withTimeStamp(new TimeStamp<Integer>() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+
+ return value;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 1;
+ }
+ }, new Extractor<Long, Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer extract(Long in) {
+ return in.intValue();
+ }
+ })).every(Count.of(2)).reduceGroup(new GroupReduceFunction<Integer, String>() {
+
+ @Override
+ public void reduce(Iterable<Integer> values, Collector<String> out)
+ throws Exception {
+ String o = "|";
+ for (Integer v : values) {
+ o = o + v + "|";
+ }
+ out.collect(o);
+ }
+ }).print();
env.executeTest(MEMORYSIZE);
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
deleted file mode 100644
index f157a08..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class BatchGroupReduceTest {
-
- public static final class MySlidingBatchReduce implements GroupReduceFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
- for (Integer value : values) {
- out.collect(value.toString());
- }
- out.collect(END_OF_BATCH);
- }
- }
-
- private final static String END_OF_BATCH = "end of batch";
- private final static int SLIDING_BATCH_SIZE = 3;
- private final static int SLIDE_SIZE = 2;
-
- @Test
- public void slidingBatchReduceTest() {
- BatchGroupReduceInvokable<Integer, String> invokable = new BatchGroupReduceInvokable<Integer, String>(
- new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE);
-
- List<String> expected = Arrays.asList("1", "2", "3", END_OF_BATCH, "3", "4", "5",
- END_OF_BATCH, "5", "6", "7", END_OF_BATCH);
- List<String> actual = MockInvokable.createAndExecute(invokable,
- Arrays.asList(1, 2, 3, 4, 5, 6, 7));
-
- assertEquals(expected, actual);
- }
-
- public static final class MyBatchReduce implements GroupReduceFunction<Double, Double> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception {
-
- Double sum = 0.;
- Double count = 0.;
- for (Double value : values) {
- sum += value;
- count++;
- }
- if (count > 0) {
- out.collect(Double.valueOf(sum / count));
- }
- }
- }
-
- private static final int BATCH_SIZE = 5;
-
- @Test
- public void nonSlidingBatchReduceTest() {
- List<Double> inputs = new ArrayList<Double>();
- for (Double i = 1.; i <= 100; i++) {
- inputs.add(i);
- }
-
- BatchGroupReduceInvokable<Double, Double> invokable = new BatchGroupReduceInvokable<Double, Double>(
- new MyBatchReduce(), BATCH_SIZE, BATCH_SIZE);
-
- List<Double> avgs = MockInvokable.createAndExecute(invokable, inputs);
-
- for (int i = 0; i < avgs.size(); i++) {
- assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
deleted file mode 100755
index b154f7f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.junit.Test;
-
-public class BatchReduceTest {
-
- @Test
- public void BatchReduceInvokableTest() {
-
- List<Integer> inputs = new ArrayList<Integer>();
- for (Integer i = 1; i <= 10; i++) {
- inputs.add(i);
- }
- BatchReduceInvokable<Integer> invokable = new BatchReduceInvokable<Integer>(
- new ReduceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }, 3, 2);
-
- List<Integer> expected = new ArrayList<Integer>();
- expected.add(6);
- expected.add(12);
- expected.add(18);
- expected.add(24);
- expected.add(19);
- assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
-
- List<Integer> inputs2 = new ArrayList<Integer>();
- inputs2.add(1);
- inputs2.add(2);
- inputs2.add(-1);
- inputs2.add(-3);
- inputs2.add(-4);
-
- BatchReduceInvokable<Integer> invokable2 = new BatchReduceInvokable<Integer>(
- new ReduceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- if (value1 <= value2) {
- return value1;
- } else {
- return value2;
- }
- }
- }, 2, 3);
-
- List<Integer> expected2 = new ArrayList<Integer>();
- expected2.add(1);
- expected2.add(-4);
-
- assertEquals(expected2, MockInvokable.createAndExecute(invokable2, inputs2));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
deleted file mode 100755
index db2b8cf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.ObjectKeySelector;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class GroupedBatchGroupReduceTest {
-
- public static final class MySlidingBatchReduce1 implements GroupReduceFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
- for (Integer value : values) {
- out.collect(value.toString());
- }
- out.collect(END_OF_GROUP);
- }
- }
-
- public static final class MySlidingBatchReduce2 extends
- RichGroupReduceFunction<Tuple2<Integer, String>, String> {
- private static final long serialVersionUID = 1L;
-
- String openString;
-
- @Override
- public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<String> out)
- throws Exception {
- out.collect(openString);
- for (Tuple2<Integer, String> value : values) {
- out.collect(value.f0.toString());
- }
- out.collect(END_OF_GROUP);
- }
-
- @Override
- public void open(Configuration c) {
- openString = "open";
- }
- }
-
- private final static String END_OF_GROUP = "end of group";
-
- @SuppressWarnings("unchecked")
- @Test
- public void slidingBatchGroupReduceTest() {
- @SuppressWarnings("rawtypes")
- GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>(
- new MySlidingBatchReduce1(), 2, 2, new ObjectKeySelector());
-
- List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "3", "3", END_OF_GROUP, "2",
- END_OF_GROUP);
- List<String> actual = MockInvokable.createAndExecute(invokable1,
- Arrays.asList(1, 1, 2, 3, 3));
-
- assertEquals(expected, actual);
-
- GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
- new MySlidingBatchReduce2(), 2, 2, new TupleKeySelector<Tuple2<Integer, String>>(1));
-
- expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", "3", END_OF_GROUP,
- "open", "4", END_OF_GROUP);
- actual = MockInvokable.createAndExecute(invokable2, Arrays.asList(
- new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "a"),
- new Tuple2<Integer, String>(3, "b"), new Tuple2<Integer, String>(3, "b"),
- new Tuple2<Integer, String>(4, "a")));
-
- assertEquals(expected, actual);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
deleted file mode 100755
index 783119c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.ObjectKeySelector;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.junit.Test;
-
-public class GroupedBatchReduceTest {
-
- @Test
- public void BatchReduceInvokableTest() {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(1);
- inputs.add(5);
- inputs.add(5);
- inputs.add(5);
- inputs.add(1);
- inputs.add(1);
- inputs.add(5);
- inputs.add(1);
- inputs.add(5);
-
- List<Integer> expected = new ArrayList<Integer>();
- expected.add(15);
- expected.add(3);
- expected.add(3);
- expected.add(15);
-
- GroupedBatchReduceInvokable<Integer> invokable = new GroupedBatchReduceInvokable<Integer>(
- new ReduceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }, 3, 2, new ObjectKeySelector<Integer>());
-
- List<Integer> actual = MockInvokable.createAndExecute(invokable, inputs);
- assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
- assertEquals(expected.size(), actual.size());
-
- List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
- inputs2.add(new Tuple2<Integer, String>(1, "a"));
- inputs2.add(new Tuple2<Integer, String>(0, "b"));
- inputs2.add(new Tuple2<Integer, String>(2, "a"));
- inputs2.add(new Tuple2<Integer, String>(-1, "a"));
- inputs2.add(new Tuple2<Integer, String>(-2, "a"));
- inputs2.add(new Tuple2<Integer, String>(10, "a"));
- inputs2.add(new Tuple2<Integer, String>(2, "b"));
- inputs2.add(new Tuple2<Integer, String>(1, "a"));
-
- List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
- expected2.add(new Tuple2<Integer, String>(-1, "a"));
- expected2.add(new Tuple2<Integer, String>(-2, "a"));
- expected2.add(new Tuple2<Integer, String>(0, "b"));
-
- GroupedBatchReduceInvokable<Tuple2<Integer, String>> invokable2 = new GroupedBatchReduceInvokable<Tuple2<Integer, String>>(
- new ReduceFunction<Tuple2<Integer, String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
- Tuple2<Integer, String> value2) throws Exception {
- if (value1.f0 <= value2.f0) {
- return value1;
- } else {
- return value2;
- }
- }
- }, 3, 3, new TupleKeySelector<Tuple2<Integer, String>>(1));
-
- List<Tuple2<Integer, String>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
-
- assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
- new HashSet<Tuple2<Integer, String>>(actual2));
- assertEquals(expected2.size(), actual2.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
deleted file mode 100644
index e93647f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class GroupedWindowGroupReduceInvokableTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void windowReduceTest() {
- List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
- inputs2.add(new Tuple2<String, Integer>("a", 1));
- inputs2.add(new Tuple2<String, Integer>("a", 2));
- inputs2.add(new Tuple2<String, Integer>("b", 2));
- inputs2.add(new Tuple2<String, Integer>("b", 2));
- inputs2.add(new Tuple2<String, Integer>("b", 5));
- inputs2.add(new Tuple2<String, Integer>("a", 7));
- inputs2.add(new Tuple2<String, Integer>("b", 9));
- inputs2.add(new Tuple2<String, Integer>("b", 10));
- // 1,2-4,5-7,8-10
-
- List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
- expected2.add(new Tuple2<String, Integer>("a", 3));
- expected2.add(new Tuple2<String, Integer>("b", 4));
- expected2.add(new Tuple2<String, Integer>("b", 5));
- expected2.add(new Tuple2<String, Integer>("a", 7));
- expected2.add(new Tuple2<String, Integer>("b", 10));
-
- GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>> invokable2 = new GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>>(
- new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
- Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>("", 0);
-
- for (@SuppressWarnings("unused")
- Tuple2<String, Integer> value : values) {
- }
-
- for (Tuple2<String, Integer> value : values) {
- outTuple.f0 = value.f0;
- outTuple.f1 += value.f1;
- }
- out.collect(outTuple);
- }
- }, 2, 3, new TupleKeySelector( 0),
- new TimeStamp<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Tuple2<String, Integer> value) {
- return value.f1;
- }
-
- @Override
- public long getStartTime() {
- return 1;
- }
- });
-
- List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
-
- assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
- new HashSet<Tuple2<String, Integer>>(actual2));
- assertEquals(expected2.size(), actual2.size());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
index 93848db..ebfed05 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.api.invokable.operator;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.HashSet;
@@ -30,12 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.MockInvokable;
@@ -78,7 +78,7 @@ public class GroupedWindowingInvokableTest {
LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
- GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
+ GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
@@ -95,11 +95,11 @@ public class GroupedWindowingInvokableTest {
}
}, triggers, evictions, centralTriggers);
- List<Tuple2<Integer, String[]>> result = MockInvokable.createAndExecute(invokable, inputs);
+ List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
List<Integer> actual = new LinkedList<Integer>();
- for (Tuple2<Integer, String[]> current : result) {
- actual.add(current.f0);
+ for (Integer current : result) {
+ actual.add(current);
}
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
@@ -137,7 +137,7 @@ public class GroupedWindowingInvokableTest {
LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
- GroupedWindowingInvokable<Tuple2<Integer, String>> invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>>(
+ GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
new ReduceFunction<Tuple2<Integer, String>>() {
private static final long serialVersionUID = 1L;
@@ -153,12 +153,11 @@ public class GroupedWindowingInvokableTest {
}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
centralTriggers);
- List<Tuple2<Tuple2<Integer, String>, String[]>> result = MockInvokable.createAndExecute(
- invokable2, inputs2);
+ List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
- for (Tuple2<Tuple2<Integer, String>, String[]> current : result) {
- actual2.add(current.f0);
+ for (Tuple2<Integer, String> current : result) {
+ actual2.add(current);
}
assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
@@ -266,14 +265,13 @@ public class GroupedWindowingInvokableTest {
LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
- GroupedWindowingInvokable<Tuple2<Integer, String>> invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>>(
+ GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
distributedTriggers, evictions, triggers);
ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
- for (Tuple2<Tuple2<Integer, String>, String[]> t : MockInvokable.createAndExecute(
- invokable, inputs)) {
- result.add(t.f0);
+ for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
}
assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
@@ -330,7 +328,7 @@ public class GroupedWindowingInvokableTest {
}
};
- GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
+ GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
myReduceFunction, new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@@ -341,8 +339,8 @@ public class GroupedWindowingInvokableTest {
}, distributedTriggers, evictions, triggers);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
- result.add(t.f0);
+ for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
}
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
@@ -350,8 +348,8 @@ public class GroupedWindowingInvokableTest {
}
/**
- * Test for combination of centralized trigger and
- * distributed trigger at the same time
+ * Test for combination of centralized trigger and distributed trigger at
+ * the same time
*/
@Test
public void testGroupedWindowingInvokableCentralAndDistrTrigger() {
@@ -406,7 +404,7 @@ public class GroupedWindowingInvokableTest {
}
};
- GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
+ GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
myReduceFunction, new KeySelector<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@@ -417,8 +415,8 @@ public class GroupedWindowingInvokableTest {
}, distributedTriggers, evictions, triggers);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
- result.add(t.f0);
+ for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
}
assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
deleted file mode 100644
index 3e3179a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.util.Collector;
-import org.junit.Before;
-import org.junit.Test;
-
-public class WindowGroupReduceInvokableTest {
-
- public static final class MySlidingWindowReduce implements GroupReduceFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
- for (Integer value : values) {
- out.collect(value.toString());
- }
- out.collect(EOW);
- }
- }
-
- public static final class MyTimestamp implements TimeStamp<Integer> {
- private static final long serialVersionUID = 1L;
-
- private Iterator<Long> timestamps;
- private long start;
-
- public MyTimestamp(List<Long> timestamps) {
- this.timestamps = timestamps.iterator();
- this.start = timestamps.get(0);
- }
-
- @Override
- public long getTimestamp(Integer value) {
- long ts = timestamps.next();
- return ts;
- }
-
- @Override
- public long getStartTime() {
- return start;
- }
- }
-
- private final static String EOW = "|";
-
- private static List<WindowGroupReduceInvokable<Integer, String>> invokables = new ArrayList<WindowGroupReduceInvokable<Integer, String>>();
- private static List<List<String>> expectedResults = new ArrayList<List<String>>();
-
- @Before
- public void before() {
- long windowSize = 3;
- long slideSize = 2;
- List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L,
- 110L);
- expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7",
- EOW, "7", "8", "9", EOW, "9", "10", EOW));
- invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
- windowSize, slideSize, new MyTimestamp(timestamps)));
-
- windowSize = 10;
- slideSize = 5;
- timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
- expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", "6", EOW, "3",
- "4", "5", "6", EOW, "7", EOW, "7",
- "8", "9", EOW, "8", "9", "10", EOW));
- invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
- windowSize, slideSize, new MyTimestamp(timestamps)));
-
- windowSize = 10;
- slideSize = 4;
- timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
- expectedResults.add(Arrays.asList("1", "2","3" ,EOW, "3", "4", "5","6", EOW, "3", "4", "5", "6",
- EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8","9",
- "10", EOW));
- invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
- windowSize, slideSize, new MyTimestamp(timestamps)));
- }
-
- @Test
- public void slidingBatchReduceTest() {
- List<List<String>> actualResults = new ArrayList<List<String>>();
-
- for (WindowGroupReduceInvokable<Integer, String> invokable : invokables) {
- List<String> result = MockInvokable.createAndExecute(invokable,
- Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- actualResults.add(result);
- }
-
- Iterator<List<String>> actualResult = actualResults.iterator();
-
- for (List<String> expectedResult : expectedResults) {
- assertEquals(expectedResult, actualResult.next());
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
deleted file mode 100755
index 5d10eff..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.junit.Test;
-
-public class WindowReduceInvokableTest {
-
- @Test
- public void windowReduceTest() {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(2);
- inputs.add(2);
- inputs.add(3);
- inputs.add(4);
- inputs.add(5);
- inputs.add(10);
- inputs.add(11);
- inputs.add(11);
- // 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
- // 12-12-5-10-32
-
- List<Integer> expected = new ArrayList<Integer>();
- expected.add(12);
- expected.add(12);
- expected.add(5);
- expected.add(10);
- expected.add(32);
-
- WindowReduceInvokable<Integer> invokable = new WindowReduceInvokable<Integer>(
- new ReduceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }, 4, 2, new TimeStamp<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- @Override
- public long getStartTime() {
- return 1;
- }
- });
-
- assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
-
- List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
- inputs2.add(new Tuple2<String, Integer>("a", 1));
- inputs2.add(new Tuple2<String, Integer>("a", 2));
- inputs2.add(new Tuple2<String, Integer>("b", 2));
- inputs2.add(new Tuple2<String, Integer>("b", 2));
- inputs2.add(new Tuple2<String, Integer>("b", 5));
- inputs2.add(new Tuple2<String, Integer>("a", 7));
- inputs2.add(new Tuple2<String, Integer>("b", 9));
- inputs2.add(new Tuple2<String, Integer>("b", 10));
-
- List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
- expected2.add(new Tuple2<String, Integer>("a", 3));
- expected2.add(new Tuple2<String, Integer>("b", 4));
- expected2.add(new Tuple2<String, Integer>("b", 5));
- expected2.add(new Tuple2<String, Integer>("a", 7));
- expected2.add(new Tuple2<String, Integer>("b", 10));
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- GroupedWindowReduceInvokable<Tuple2<String, Integer>> invokable2 = new GroupedWindowReduceInvokable<Tuple2<String, Integer>>(
- new ReduceFunction<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2) throws Exception {
- return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
- }
- }, 2, 3, new TupleKeySelector(0), new TimeStamp<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Tuple2<String, Integer> value) {
- return value.f1;
- }
-
- @Override
- public long getStartTime() {
- return 1;
- }
- });
-
- List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
- assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
- new HashSet<Tuple2<String, Integer>>(actual2));
- assertEquals(expected2.size(), actual2.size());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
index 79df1c5..8e96a7c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
@@ -17,21 +17,20 @@
package org.apache.flink.streaming.api.invokable.operator;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;
@@ -92,6 +91,7 @@ public class WindowingInvokableTest {
// trigger after 4, then every 2)
triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L,
new Extractor<Long, Integer>() {
+
private static final long serialVersionUID = 1L;
@Override
@@ -103,12 +103,12 @@ public class WindowingInvokableTest {
// Always delete all elements older then 4
evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
- WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
- triggers, evictions);
+ WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+ myReduceFunction, triggers, evictions);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
- result.add(t.f0);
+ for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
}
assertEquals(expected, result);
@@ -147,8 +147,8 @@ public class WindowingInvokableTest {
// time after the 3rd element
evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
- WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
- triggers, evictions);
+ WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+ myReduceFunction, triggers, evictions);
List<Integer> expected = new ArrayList<Integer>();
expected.add(6);
@@ -157,8 +157,8 @@ public class WindowingInvokableTest {
expected.add(24);
expected.add(19);
List<Integer> result = new ArrayList<Integer>();
- for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
- result.add(t.f0);
+ for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
}
assertEquals(expected, result);
@@ -200,16 +200,16 @@ public class WindowingInvokableTest {
// time after on the 5th element
evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
- WindowingInvokable<Integer> invokable2 = new WindowingInvokable<Integer>(myReduceFunction,
- triggers, evictions);
+ WindowingInvokable<Integer, Integer> invokable2 = new WindowingReduceInvokable<Integer>(
+ myReduceFunction, triggers, evictions);
List<Integer> expected2 = new ArrayList<Integer>();
expected2.add(1);
expected2.add(-4);
result = new ArrayList<Integer>();
- for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable2, inputs2)) {
- result.add(t.f0);
+ for (Integer t : MockInvokable.createAndExecute(invokable2, inputs2)) {
+ result.add(t);
}
assertEquals(expected2, result);
@@ -223,8 +223,8 @@ public class WindowingInvokableTest {
triggers.add(new CountTriggerPolicy<Integer>(3));
LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
- evictions.add(new CountEvictionPolicy<Integer>(2,2));
- evictions.add(new CountEvictionPolicy<Integer>(3,3));
+ evictions.add(new CountEvictionPolicy<Integer>(2, 2));
+ evictions.add(new CountEvictionPolicy<Integer>(3, 3));
List<Integer> inputs = new ArrayList<Integer>();
for (Integer i = 1; i <= 10; i++) {
@@ -258,12 +258,12 @@ public class WindowingInvokableTest {
}
};
- WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
- triggers, evictions);
+ WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+ myReduceFunction, triggers, evictions);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
- result.add(t.f0);
+ for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
}
assertEquals(expected, result);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 839564d..34e292a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -17,12 +17,15 @@
package org.apache.flink.streaming.api.windowing.policy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.helper.Time;
import org.junit.Test;
-import static org.junit.Assert.*;
-
public class TimeTriggerPolicyTest {
@Test
@@ -49,15 +52,8 @@ public class TimeTriggerPolicyTest {
// test different granularity
for (long granularity = 0; granularity < 31; granularity++) {
// create policy
- TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp,
- new Extractor<Long, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer extract(Long in) {
- return in.intValue();
- }
- });
+ TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
+ timeStamp, new Time.NullExtractor<Integer>());
// remember window border
// Remark: This might NOT work in case the timeStamp uses
@@ -88,7 +84,7 @@ public class TimeTriggerPolicyTest {
@Test
public void timeTriggerPreNotifyTest() {
// create some test data
- Integer[] times = { 1, 3, 20, 26};
+ Integer[] times = { 1, 3, 20, 26 };
// create a timestamp
@SuppressWarnings("serial")
@@ -107,8 +103,9 @@ public class TimeTriggerPolicyTest {
};
// create policy
- TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp,
- new Extractor<Long, Integer>() {
+ TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
+ timeStamp, new Extractor<Long, Integer>() {
+
private static final long serialVersionUID = 1L;
@Override
@@ -117,16 +114,16 @@ public class TimeTriggerPolicyTest {
}
});
- //expected result
- Integer[][] result={{},{},{5,10,15},{25}};
-
- //call policy
- for (int i=0;i<times.length;i++){
- arrayEqualityCheck(result[i],policy.preNotifyTrigger(times[i]));
+ // expected result
+ Integer[][] result = { {}, {}, { 5, 10, 15 }, { 25 } };
+
+ // call policy
+ for (int i = 0; i < times.length; i++) {
+ arrayEqualityCheck(result[i], policy.preNotifyTrigger(times[i]));
policy.notifyTrigger(times[i]);
}
}
-
+
private void arrayEqualityCheck(Object[] array1, Object[] array2) {
assertEquals("The result arrays must have the same length", array1.length, array2.length);
for (int i = 0; i < array1.length; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 2def142..375c86d 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,11 +17,14 @@
package org.apache.flink.streaming.examples.ml;
+import java.util.concurrent.TimeUnit;
+
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.util.Collector;
/**
@@ -58,7 +61,8 @@ public class IncrementalLearningSkeleton {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// build new model on every second of new data
- DataStream<Double[]> model = env.addSource(new TrainingDataSource()).window(5000)
+ DataStream<Double[]> model = env.addSource(new TrainingDataSource())
+ .window(Time.of(5000, TimeUnit.MILLISECONDS))
.reduceGroup(new PartialModelBuilder());
// use partial model for prediction
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
deleted file mode 100644
index c0e768b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.util.Collector;
-
-/**
- * A minimal example as introduction to the policy based windowing
- */
-public class BasicExample {
-
- private static final int PARALLELISM = 1;
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(PARALLELISM);
-
- // This reduce function does a String concat.
- ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
-
- /**
- * Auto generates version ID
- */
- private static final long serialVersionUID = 1L;
-
- @Override
- public String reduce(String value1, String value2) throws Exception {
- return value1 + "|" + value2;
- }
-
- };
-
- DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource())
- .window(Count.of(5)).every(Count.of(2)).reduce(reduceFunction);
-
- stream.print();
-
- env.execute();
- }
-
- public static class BasicSource implements SourceFunction<String> {
-
- private static final long serialVersionUID = 1L;
- String str = new String("streaming");
-
- @Override
- public void invoke(Collector<String> out) throws Exception {
- // continuous emit
- while (true) {
- out.collect(str);
- }
- }
- }
-
-}