You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:24 UTC
[19/34] incubator-flink git commit: [streaming] Extractor requirement
removed for custom timestamp windowing
[streaming] Extractor requirement removed for custom timestamp windowing
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/55ee64f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/55ee64f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/55ee64f6
Branch: refs/heads/master
Commit: 55ee64f6c9824d51922fe4a8fb1435030925b110
Parents: f96ba06
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Nov 24 00:32:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100
----------------------------------------------------------------------
.../operator/GroupedWindowingInvokable.java | 11 +++--
.../invokable/operator/WindowingInvokable.java | 10 ++---
.../streaming/api/windowing/helper/Time.java | 8 +---
.../windowing/policy/ActiveEvictionPolicy.java | 2 +-
.../policy/ActiveEvictionPolicyWrapper.java | 5 ++-
.../windowing/policy/ActiveTriggerCallback.java | 11 ++---
.../windowing/policy/ActiveTriggerPolicy.java | 16 +++----
.../windowing/policy/TimeEvictionPolicy.java | 46 ++++++++++++++------
.../api/windowing/policy/TimeTriggerPolicy.java | 42 ++++++++----------
.../apache/flink/streaming/api/PrintTest.java | 8 ----
.../operator/GroupedWindowingInvokableTest.java | 13 +-----
.../operator/WindowingInvokableTest.java | 13 +-----
.../windowing/policy/TimeTriggerPolicyTest.java | 16 ++-----
13 files changed, 84 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
index e957b8c..c341e56 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
@@ -138,8 +138,7 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
* If only one element is contained a group, this element itself
* is returned as aggregated result.)
*/
- public GroupedWindowingInvokable(Function userFunction,
- KeySelector<IN, ?> keySelector,
+ public GroupedWindowingInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
@@ -174,8 +173,8 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
// Run the precalls for central active triggers
for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
- IN[] result = trigger.preNotifyTrigger(reuse.getObject());
- for (IN in : result) {
+ Object[] result = trigger.preNotifyTrigger(reuse.getObject());
+ for (Object in : result) {
for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
}
@@ -305,7 +304,7 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
*
* @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
*/
- private class WindowingCallback implements ActiveTriggerCallback<IN> {
+ private class WindowingCallback implements ActiveTriggerCallback {
private ActiveTriggerPolicy<IN> policy;
public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
@@ -313,7 +312,7 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
}
@Override
- public void sendFakeElement(IN datapoint) {
+ public void sendFakeElement(Object datapoint) {
for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
index d0855e6..95a999c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
@@ -101,7 +101,7 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
* This class allows the active trigger threads to call back and push fake
* elements at any time.
*/
- private class WindowingCallback implements ActiveTriggerCallback<IN> {
+ private class WindowingCallback implements ActiveTriggerCallback {
private ActiveTriggerPolicy<IN> policy;
public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
@@ -109,7 +109,7 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
}
@Override
- public void sendFakeElement(IN datapoint) {
+ public void sendFakeElement(Object datapoint) {
processFakeElement(datapoint, this.policy);
}
@@ -215,7 +215,7 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
* @param currentPolicy
* the policy which produced this fake element
*/
- protected synchronized void processFakeElement(IN input, TriggerPolicy<IN> currentPolicy) {
+ protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy) {
// Process the evictions and take care of double evictions
// In case there are multiple eviction policies present,
@@ -283,8 +283,8 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
// of the different fake elements returned by this triggers becomes
// a problem. This might lead to unexpected results...
// Should we limit the number of active triggers to 0 or 1?
- IN[] result = trigger.preNotifyTrigger(input);
- for (IN in : result) {
+ Object[] result = trigger.preNotifyTrigger(input);
+ for (Object in : result) {
processFakeElement(in, trigger);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 0b73150..86ad952 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -40,7 +40,6 @@ public class Time<DATA> implements WindowingHelper<DATA> {
private int timeVal;
private TimeUnit granularity;
- private Extractor<Long, DATA> longToDATAExtractor;
private TimeStamp<DATA> timeStamp;
private long delay;
@@ -59,7 +58,6 @@ public class Time<DATA> implements WindowingHelper<DATA> {
private Time(int timeVal, TimeUnit timeUnit) {
this.timeVal = timeVal;
this.granularity = timeUnit;
- this.longToDATAExtractor = new NullExtractor<DATA>();
this.timeStamp = new DefaultTimeStamp<DATA>();
this.delay = 0;
}
@@ -71,8 +69,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
@Override
public TriggerPolicy<DATA> toTrigger() {
- return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay,
- longToDATAExtractor);
+ return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay);
}
/**
@@ -95,9 +92,8 @@ public class Time<DATA> implements WindowingHelper<DATA> {
}
@SuppressWarnings("unchecked")
- public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp, Extractor<Long, R> extractor) {
+ public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp) {
this.timeStamp = (TimeStamp<DATA>) timeStamp;
- this.longToDATAExtractor = (Extractor<Long, DATA>) extractor;
return (Time<R>) this;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
index 17d4914..fe172bc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
@@ -47,6 +47,6 @@ public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
* @return the number of elements to delete from the buffer (only real
* elements are counted)
*/
- public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize);
+ public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
index a110fbc..b3b6935 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
@@ -55,9 +55,10 @@ public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<D
return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
}
+ @SuppressWarnings("unchecked")
@Override
- public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
- return nestedPolicy.notifyEviction(datapoint, true, bufferSize);
+ public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
+ return nestedPolicy.notifyEviction((DATA) datapoint, true, bufferSize);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
index d74a3ae..c44be37 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
@@ -20,18 +20,15 @@ package org.apache.flink.streaming.api.windowing.policy;
/**
* In case an {@link ActiveTriggerPolicy} is used, it can implement own
* {@link Runnable} classes. Such {@link Runnable} classes will be executed as
- * an own thread and can submit fake elements, to the element
- * buffer at any time.
+ * an own thread and can submit fake elements, to the element buffer at any
+ * time.
*
* The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
* instance of this interface as parameter. The describes adding of elements can
* be done by the runnable using the methods provided in this interface.
*
- * @param <DATA>
- * The data type which can be consumed by the methods provided in
- * this callback implementation.
*/
-public interface ActiveTriggerCallback<DATA> {
+public interface ActiveTriggerCallback {
/**
* Submits a new fake data point to the element buffer. Such a fake element
@@ -43,6 +40,6 @@ public interface ActiveTriggerCallback<DATA> {
* @param datapoint
* the fake data point to be added
*/
- public void sendFakeElement(DATA datapoint);
+ public void sendFakeElement(Object datapoint);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
index f549766..a8a704d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -24,12 +24,12 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp;
* for active triggers. Active triggers can act in two ways:
*
* 1) Whenever an element arrives at the invokable, the
- * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called first.
- * It can return zero ore more fake data points which will be added before the
- * the currently arrived real element gets processed. This allows to handle
- * empty windows in time based windowing with an user defined {@link TimeStamp}.
- * Triggers are not called on fake datapoint. A fake datapoint is always
- * considered as triggered.
+ * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
+ * first. It can return zero ore more fake data points which will be added
+ * before the the currently arrived real element gets processed. This allows to
+ * handle empty windows in time based windowing with an user defined
+ * {@link TimeStamp}. Triggers are not called on fake datapoint. A fake
+ * datapoint is always considered as triggered.
*
* 2) An active trigger has a factory method for a runnable. This factory method
* gets called at the start up of the invokable. The returned runnable will be
@@ -57,7 +57,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
* @return zero ore more fake data points which will be added before the the
* currently arrived real element gets processed.
*/
- public DATA[] preNotifyTrigger(DATA datapoint);
+ public Object[] preNotifyTrigger(DATA datapoint);
/**
* This is the factory method for a runnable. This factory method gets
@@ -76,6 +76,6 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
* executed as an own thread and can submit fake elements, to the
* element buffer at any time.
*/
- public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback);
+ public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index b212293..e886c09 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -69,25 +69,49 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
this.granularity = granularity;
}
+ @SuppressWarnings("unchecked")
@Override
- public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
- return notifyEviction(datapoint, triggered, bufferSize, false);
+ public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
+ checkForDeleted(bufferSize);
+
+ long threshold;
+ try {
+ threshold = (Long) datapoint - granularity;
+ } catch (ClassCastException e) {
+ threshold = timestamp.getTimestamp((DATA) datapoint) - granularity;
+ }
+
+ // return result
+ return deleteAndCountExpired(threshold);
+
}
@Override
- public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
- return notifyEviction(datapoint, true, bufferSize, true);
+ public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+
+ checkForDeleted(bufferSize);
+
+ // delete and count expired tuples
+ long threshold = timestamp.getTimestamp(datapoint) - granularity;
+ int counter = deleteAndCountExpired(threshold);
+
+ // Add current element to buffer
+ buffer.add(datapoint);
+
+ // return result
+ return counter;
+
}
- private int notifyEviction(DATA datapoint, boolean triggered, int bufferSize, boolean isFake) {
+ private void checkForDeleted(int bufferSize) {
// check for deleted tuples (deletes by other policies)
while (bufferSize < this.buffer.size()) {
this.buffer.removeFirst();
}
+ }
- // delete and count expired tuples
+ private int deleteAndCountExpired(long threshold) {
int counter = 0;
- long threshold = timestamp.getTimestamp(datapoint) - granularity;
while (!buffer.isEmpty()) {
if (timestamp.getTimestamp(buffer.getFirst()) < threshold) {
@@ -97,14 +121,8 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
break;
}
}
-
- if (!isFake) {
- // Add current element to buffer
- buffer.add(datapoint);
- }
-
- // return result
return counter;
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 1dd713f..c3402c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -21,7 +21,6 @@ import java.util.LinkedList;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
/**
* This trigger policy triggers with regard to the time. The is measured using a
@@ -46,8 +45,6 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
protected TimeStamp<DATA> timestamp;
protected long delay;
- private Extractor<Long, DATA> longToDATAExtractor;
-
/**
* This trigger policy triggers with regard to the time. The is measured
* using a given {@link TimeStamp} implementation. A point in time is always
@@ -67,17 +64,16 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* extractor should wrap a long into such an element of type
* DATA.
*/
- public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp,
- Extractor<Long, DATA> timeWrapper) {
- this(granularity, timestamp, 0, timeWrapper);
+ public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp) {
+ this(granularity, timestamp, 0);
}
/**
* This is mostly the same as
- * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In
- * addition to granularity and timestamp a delay can be specified for the
- * first trigger. If the start time given by the timestamp is x, the delay
- * is y, and the granularity is z, the first trigger will happen at x+y+z.
+ * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
+ * to granularity and timestamp a delay can be specified for the first
+ * trigger. If the start time given by the timestamp is x, the delay is y,
+ * and the granularity is z, the first trigger will happen at x+y+z.
*
* @param granularity
* The granularity of the trigger. If this value is set to 2 the
@@ -95,31 +91,27 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* extractor should wrap a long into such an element of type
* DATA.
*/
- public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay,
- Extractor<Long, DATA> timeWrapper) {
+ public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay) {
this.startTime = timestamp.getStartTime() + delay;
this.timestamp = timestamp;
this.granularity = granularity;
this.delay = delay;
- this.longToDATAExtractor = timeWrapper;
-
}
/**
* This method checks if we missed a window end. If this is the case we
* trigger the missed windows using fake elements.
*/
- @SuppressWarnings("unchecked")
@Override
- public synchronized DATA[] preNotifyTrigger(DATA datapoint) {
- LinkedList<DATA> fakeElements = new LinkedList<DATA>();
+ public synchronized Object[] preNotifyTrigger(DATA datapoint) {
+ LinkedList<Object> fakeElements = new LinkedList<Object>();
// check if there is more then one window border missed
// use > here. In case >= would fit, the regular call will do the job.
while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
startTime += granularity;
- fakeElements.add(longToDATAExtractor.extract(startTime));
+ fakeElements.add(startTime);
}
- return (DATA[]) fakeElements.toArray();
+ return (Object[]) fakeElements.toArray();
}
/**
@@ -134,7 +126,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* time. If any other time measure is used the method return null.
*/
@Override
- public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback) {
+ public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
if (this.timestamp instanceof DefaultTimeStamp) {
return new TimeCheck(callback);
} else {
@@ -149,19 +141,19 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
* @param callback
* The callback object.
*/
- private synchronized void activeFakeElementEmission(ActiveTriggerCallback<DATA> callback) {
+ private synchronized void activeFakeElementEmission(ActiveTriggerCallback callback) {
if (System.currentTimeMillis() >= startTime + granularity) {
startTime += granularity;
- callback.sendFakeElement(longToDATAExtractor.extract(startTime));
+ callback.sendFakeElement(startTime);
}
}
private class TimeCheck implements Runnable {
- ActiveTriggerCallback<DATA> callback;
+ ActiveTriggerCallback callback;
- public TimeCheck(ActiveTriggerCallback<DATA> callback) {
+ public TimeCheck(ActiveTriggerCallback callback) {
this.callback = callback;
}
@@ -198,7 +190,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
@Override
public TimeTriggerPolicy<DATA> clone() {
- return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay, longToDATAExtractor);
+ return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index c494d5f..f7ba82e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -81,14 +81,6 @@ public class PrintTest implements Serializable {
public long getStartTime() {
return 1;
}
- }, new Extractor<Long, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer extract(Long in) {
- return in.intValue();
- }
})).every(Count.of(2)).reduceGroup(new GroupReduceFunction<Integer, String>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
index ebfed05..79288d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
@@ -28,14 +28,13 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.util.MockInvokable;
@@ -249,15 +248,7 @@ public class GroupedWindowingInvokableTest {
LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
// Trigger every 2 time units but delay the first trigger by 2 (First
// trigger after 4, then every 2)
- triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L,
- new Extractor<Long, Tuple2<Integer, String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, String> extract(Long in) {
- return new Tuple2<Integer, String>(in.intValue(), null);
- }
- }));
+ triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
// Always delete all elements older then 4
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
index 8e96a7c..32e71ba 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
@@ -25,12 +25,11 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;
@@ -89,16 +88,8 @@ public class WindowingInvokableTest {
LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
// Trigger every 2 time units but delay the first trigger by 2 (First
// trigger after 4, then every 2)
- triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L,
- new Extractor<Long, Integer>() {
-
- private static final long serialVersionUID = 1L;
+ triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L));
- @Override
- public Integer extract(Long in) {
- return in.intValue();
- }
- }));
LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
// Always delete all elements older then 4
evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 34e292a..1355b27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.helper.Time;
import org.junit.Test;
public class TimeTriggerPolicyTest {
@@ -53,7 +51,7 @@ public class TimeTriggerPolicyTest {
for (long granularity = 0; granularity < 31; granularity++) {
// create policy
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
- timeStamp, new Time.NullExtractor<Integer>());
+ timeStamp);
// remember window border
// Remark: This might NOT work in case the timeStamp uses
@@ -104,18 +102,10 @@ public class TimeTriggerPolicyTest {
// create policy
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
- timeStamp, new Extractor<Long, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer extract(Long in) {
- return in.intValue();
- }
- });
+ timeStamp);
// expected result
- Integer[][] result = { {}, {}, { 5, 10, 15 }, { 25 } };
+ Long[][] result = { {}, {}, { 5L, 10L, 15L }, { 25L } };
// call policy
for (int i = 0; i < times.length; i++) {