You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/27 15:50:37 UTC
[4/4] flink git commit: [FLINK-3464] Use Processing-Time Clock in
Window Assigners/Triggers
[FLINK-3464] Use Processing-Time Clock in Window Assigners/Triggers
Introduces a custom TimeServiceProvider to the StreamTask.
This is responsible for defining and updating the current
processingtime for a task and handling all related action,
such as registering timers for actions to be executed in
the future.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b5a7890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b5a7890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b5a7890
Branch: refs/heads/master
Commit: 4b5a789000a903c84f89b668a4cbd2ba1397e758
Parents: cb2b76d
Author: kl0u <kk...@gmail.com>
Authored: Thu May 12 14:16:14 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 27 17:16:12 2016 +0200
----------------------------------------------------------------------
.../kafka/internals/AbstractFetcher.java | 4 +-
.../kafka/testutils/MockRuntimeContext.java | 44 ++--
.../api/operators/AbstractStreamOperator.java | 4 +
.../api/operators/StreamingRuntimeContext.java | 10 +-
.../assigners/EventTimeSessionWindows.java | 2 +-
.../api/windowing/assigners/GlobalWindows.java | 2 +-
.../assigners/ProcessingTimeSessionWindows.java | 5 +-
.../assigners/SlidingEventTimeWindows.java | 2 +-
.../assigners/SlidingProcessingTimeWindows.java | 4 +-
.../assigners/TumblingEventTimeWindows.java | 2 +-
.../TumblingProcessingTimeWindows.java | 4 +-
.../api/windowing/assigners/WindowAssigner.java | 3 +-
.../assigners/WindowAssignerContext.java | 37 ++++
.../triggers/ContinuousEventTimeTrigger.java | 2 +
.../ContinuousProcessingTimeTrigger.java | 8 +-
.../triggers/ProcessingTimeTrigger.java | 2 +-
.../api/windowing/triggers/Trigger.java | 9 +-
.../runtime/io/StreamInputProcessor.java | 4 +-
.../windowing/EvictingWindowOperator.java | 6 +-
.../operators/windowing/WindowOperator.java | 29 ++-
.../tasks/DefaultTimeServiceProvider.java | 60 +++++
.../streaming/runtime/tasks/StreamTask.java | 113 +++++++---
.../runtime/tasks/TestTimeServiceProvider.java | 91 ++++++++
.../runtime/tasks/TimeServiceProvider.java | 41 ++++
.../runtime/operators/StreamTaskTimerTest.java | 57 ++++-
.../operators/windowing/WindowOperatorTest.java | 222 ++++++++++++++++++-
.../runtime/tasks/StreamTaskTestHarness.java | 7 +
.../util/OneInputStreamOperatorTestHarness.java | 96 ++++++--
28 files changed, 762 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 8183575..f9d2e64 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -425,7 +425,7 @@ public abstract class AbstractFetcher<T, KPH> {
//-------------------------------------------------
public void start() {
- triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+ triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
}
@Override
@@ -454,7 +454,7 @@ public abstract class AbstractFetcher<T, KPH> {
}
// schedule the next watermark
- triggerContext.registerTimer(System.currentTimeMillis() + interval, this);
+ triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 1ac2ef5..f1bb157 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -38,15 +38,16 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
@SuppressWarnings("deprecation")
public class MockRuntimeContext extends StreamingRuntimeContext {
@@ -57,16 +58,27 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
private final ExecutionConfig execConfig;
private final Object checkpointLock;
- private ScheduledExecutorService timer;
-
+ private final TimeServiceProvider timerService;
+
public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
}
-
+
+ public MockRuntimeContext(
+ int numberOfParallelSubtasks, int indexOfThisSubtask,
+ ExecutionConfig execConfig,
+ Object checkpointLock) {
+
+ this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock,
+ DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
+ }
+
public MockRuntimeContext(
- int numberOfParallelSubtasks, int indexOfThisSubtask,
+ int numberOfParallelSubtasks, int indexOfThisSubtask,
ExecutionConfig execConfig,
- Object checkpointLock) {
+ Object checkpointLock,
+ TimeServiceProvider timerService) {
+
super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
Collections.<String, Accumulator<?, ?>>emptyMap());
@@ -75,6 +87,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
this.indexOfThisSubtask = indexOfThisSubtask;
this.execConfig = execConfig;
this.checkpointLock = checkpointLock;
+ this.timerService = timerService;
}
@Override
@@ -186,16 +199,17 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException();
}
-
+
+ public long getCurrentProcessingTime() {
+ Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
+ return timerService.getCurrentProcessingTime();
+ }
+
@Override
public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
- if (timer == null) {
- timer = Executors.newSingleThreadScheduledExecutor();
- }
+ Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
- final long delay = Math.max(time - System.currentTimeMillis(), 0);
-
- return timer.schedule(new Runnable() {
+ return timerService.registerTimer(time, new Runnable() {
@Override
public void run() {
synchronized (checkpointLock) {
@@ -207,7 +221,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
}
}
}
- }, delay, TimeUnit.MILLISECONDS);
+ });
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 3efc469..05fc158 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -260,6 +260,10 @@ public abstract class AbstractStreamOperator<OUT>
return container.registerTimer(time, target);
}
+ protected long getCurrentProcessingTime() {
+ return container.getCurrentProcessingTime();
+ }
+
/**
* Creates a partitioned state handle, using the state backend configured for this task.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index a858b4c..6a09492 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -93,7 +93,15 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
return operator.registerTimer(time, target);
}
-
+
+ /**
+ * Returns the current processing time as defined by the task's
+ * {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider}
+ */
+ public long getCurrentProcessingTime() {
+ return operator.getCurrentProcessingTime();
+ }
+
// ------------------------------------------------------------------------
// broadcast variables
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index 64c14cd..e38f617 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -51,7 +51,7 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
}
@Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 71101f6..1c6284a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -43,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private GlobalWindows() {}
@Override
- public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
+ public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index 0e1682d..52d1c03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -51,8 +51,9 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
}
@Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+ long currentProcessingTime = context.getCurrentProcessingTime();
+ return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 83511df..8fd0d25 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -58,7 +58,7 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
}
@Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index d2b0707..6a03640 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -55,8 +55,8 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
}
@Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- timestamp = System.currentTimeMillis();
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+ timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 70432a6..44464f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -54,7 +54,7 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
}
@Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 3ec55d0..ce36144 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -51,8 +51,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
}
@Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- final long now = System.currentTimeMillis();
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
+ final long now = context.getCurrentProcessingTime();
long start = now - (now % size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 0b49bce..c25d6d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -50,8 +50,9 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
+ * @param context The {@link WindowAssignerContext} in which the assigner operates.
*/
- public abstract Collection<W> assignWindows(T element, long timestamp);
+ public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
* Returns the default trigger associated with this {@code WindowAssigner}.
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
new file mode 100644
index 0000000..e3f51a2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * A context provided to the {@link WindowAssigner} that allows it to query the
+ * current processing time. This is provided to the assigner by its containing
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
+ * which, in turn, gets it from the containing
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+ */
+public abstract class WindowAssignerContext {
+
+ /**
+ * Returns the current processing time, as returned by
+ * the {@link StreamTask#getCurrentProcessingTime()}.
+ */
+ public abstract long getCurrentProcessingTime();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 5cb0e4d..02613f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -89,6 +89,8 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+ long timestamp = fireTimestamp.get();
+ ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index c6e11b1..b224cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
- * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
- * system time.
+ * A {@link Trigger} that continuously fires based on a given time interval as measured by
+ * the clock of the machine on which the job is running.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
@@ -52,7 +52,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
- timestamp = System.currentTimeMillis();
+ timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
@@ -87,6 +87,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+ long timestamp = fireTimestamp.get();
+ ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 06193cd..8ea6a43 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
- ctx.registerProcessingTimeTimer(window.getEnd());
+ ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index c9b9ff1..4d6c60f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import java.io.Serializable;
@@ -127,6 +128,12 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
public interface TriggerContext {
/**
+ * Returns the current processing time, as returned by
+ * the {@link StreamTask#getCurrentProcessingTime()}.
+ */
+ long getCurrentProcessingTime();
+
+ /**
* Returns the metric group for this {@link Trigger}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
* function.
@@ -170,7 +177,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
void deleteEventTimeTimer(long time);
/**
- * Retrieves an {@link State} object that can be used to interact with
+ * Retrieves a {@link State} object that can be used to interact with
* fault-tolerant state that is scoped to the window and key of the current
* trigger invocation.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 33a0407..657d973 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -63,7 +63,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
*/
@Internal
public class StreamInputProcessor<IN> {
-
+
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
@@ -76,7 +76,7 @@ public class StreamInputProcessor<IN> {
private boolean isFinished;
-
+
private final long[] watermarks;
private long lastEmittedWatermark;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index fa1c894..d82fc85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -83,10 +83,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
-
Collection<W> elementWindows = windowAssigner.assignWindows(
- element.getValue(),
- element.getTimestamp());
+ element.getValue(),
+ element.getTimestamp(),
+ windowAssignerContext);
final K key = (K) getStateBackend().getCurrentKey();
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index bad1a22..ad01a5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -80,9 +81,9 @@ import static java.util.Objects.requireNonNull;
*
* <p>
* When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
- * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
- * is put into panes. A pane is the bucket of elements that have the same key and belong to the same
- * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
+ * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
+ * is put into panes. A pane is the bucket of elements that have the same key and same
+ * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
* {@code WindowAssigner}.
*
* <p>
@@ -160,6 +161,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected transient Context context = new Context(null, null);
+ protected transient WindowAssignerContext windowAssignerContext;
+
// ------------------------------------------------------------------------
// State that needs to be checkpointed
// ------------------------------------------------------------------------
@@ -245,6 +248,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
context = new Context(null, null);
+ windowAssignerContext = new WindowAssignerContext() {
+ @Override
+ public long getCurrentProcessingTime() {
+ return WindowOperator.this.getCurrentProcessingTime();
+ }
+ };
+
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindowsByKey = new HashMap<>();
}
@@ -261,6 +271,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
processingTimeTimers = null;
processingTimeTimersQueue = null;
context = null;
+ windowAssignerContext = null;
mergingWindowsByKey = null;
}
@@ -273,16 +284,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
processingTimeTimers = null;
processingTimeTimersQueue = null;
context = null;
+ windowAssignerContext = null;
mergingWindowsByKey = null;
}
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
-
Collection<W> elementWindows = windowAssigner.assignWindows(
- element.getValue(),
- element.getTimestamp());
+ element.getValue(), element.getTimestamp(), windowAssignerContext);
final K key = (K) getStateBackend().getCurrentKey();
@@ -669,6 +679,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@Override
+ public long getCurrentProcessingTime() {
+ return WindowOperator.this.getCurrentProcessingTime();
+ }
+
+ @Override
public void registerProcessingTimeTimer(long time) {
Timer<K, W> timer = new Timer<>(time, key, window);
// make sure we only put one timer per key into the queue
@@ -676,7 +691,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
processingTimeTimersQueue.add(timer);
//If this is the first timer added for this timestamp register a TriggerTask
if (processingTimeTimerTimestamps.add(time, 1) == 0) {
- ScheduledFuture<?> scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this);
+ ScheduledFuture<?> scheduledFuture = WindowOperator.this.registerTimer(time, WindowOperator.this);
processingTimeTimerFutures.put(time, scheduledFuture);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
new file mode 100644
index 0000000..b803b82
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
+ * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
+ */
+public class DefaultTimeServiceProvider extends TimeServiceProvider {
+
+ /** The executor service that schedules and calls the triggers of this task*/
+ private final ScheduledExecutorService timerService;
+
+ public static DefaultTimeServiceProvider create (ScheduledExecutorService executor) {
+ return new DefaultTimeServiceProvider(executor);
+ }
+
+ private DefaultTimeServiceProvider(ScheduledExecutorService threadPoolExecutor) {
+ this.timerService = threadPoolExecutor;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
+ long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
+ return timerService.schedule(target, delay, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void shutdownService() throws Exception {
+ if (!timerService.isTerminated()) {
+ StreamTask.LOG.info("Timer service is shutting down.");
+ }
+ timerService.shutdownNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 444245c..a5de312 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -56,13 +56,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
* and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
* the Task's operator chain. Operators that are chained together execute synchronously in the
- * same thread and hence on the same stream partition. A common case for these chaines
+ * same thread and hence on the same stream partition. A common case for these chains
* are successive map/flatmap/filter tasks.
*
* <p>The task chain contains one "head" operator and multiple chained operators.
@@ -127,10 +126,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
/** The class loader used to load dynamic classes of a job */
private ClassLoader userClassLoader;
-
- /** The executor service that schedules and calls the triggers of this task*/
- private ScheduledThreadPoolExecutor timerService;
-
+
+ /**
+ * The internal {@link TimeServiceProvider} used to define the current
+ * processing time (default = {@code System.currentTimeMillis()}) and
+ * register timers for tasks to be executed in the future.
+ */
+ private TimeServiceProvider timerService;
+
/** The map of user-defined accumulators of this task */
private Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -172,7 +175,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
// Core work methods of the Stream Task
// ------------------------------------------------------------------------
-
+
+ /**
+ * Allows the user to specify his own {@link TimeServiceProvider TimerServiceProvider}.
+ * By default a {@link DefaultTimeServiceProvider DefaultTimerService} is going to be provided.
+ * Changing it can be useful for testing processing time functionality, such as
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}
+ * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger Triggers}.
+ * */
+ public void setTimeService(TimeServiceProvider timeProvider) {
+ if (timeProvider == null) {
+ throw new RuntimeException("The timeProvider cannot be set to null.");
+ }
+ timerService = timeProvider;
+ }
+
+ public long getCurrentProcessingTime() {
+ return timerService.getCurrentProcessingTime();
+ }
+
@Override
public final void invoke() throws Exception {
@@ -185,6 +206,19 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
configuration = new StreamConfig(getTaskConfiguration());
accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
+ // if the clock is not already set, then assign a default TimeServiceProvider
+ if (timerService == null) {
+
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
+ new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
+
+ // allow trigger tasks to be removed if all timers for
+ // that timestamp are removed by user
+ executor.setRemoveOnCancelPolicy(true);
+
+ timerService = DefaultTimeServiceProvider.create(executor);
+ }
+
headOperator = configuration.getStreamOperator(userClassLoader);
operatorChain = new OperatorChain<>(this, headOperator,
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -193,10 +227,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
}
- timerService =new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
- // allow trigger tasks to be removed if all timers for that timestamp are removed by user
- timerService.setRemoveOnCancelPolicy(true);
-
getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() {
@Override
public Long getValue() {
@@ -265,7 +295,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// stop all timers and threads
if (timerService != null) {
try {
- timerService.shutdownNow();
+ timerService.shutdownService();
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
@@ -333,7 +363,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
public final boolean isCanceled() {
return canceled;
}
-
+
+ /**
+ * Execute the operator-specific {@link StreamOperator#open()} method in each
+ * of the operators in the chain of this {@link StreamTask}. </b> Opening happens
+ * from <b>tail to head</b> operator in the chain, contrary to
+ * {@link StreamOperator#close()} which happens <b>head to tail</b>
+ * operator (see {@link #closeAllOperators()}.
+ */
private void openAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
@@ -342,6 +379,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
+ /**
+ * Execute the operator-specific {@link StreamOperator#close()} method in each
+ * of the operators in the chain of this {@link StreamTask}. </b> Closing happens
+ * from <b>head to tail</b> operator in the chain, contrary to
+ * {@link StreamOperator#open()} which happens <b>tail to head</b> operator
+ * (see {@link #openAllOperators()}.
+ */
private void closeAllOperators() throws Exception {
// We need to close them first to last, since upstream operators in the chain might emit
// elements in their close methods.
@@ -354,6 +398,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
+ /**
+ * Execute the operator-specific {@link StreamOperator#dispose()} method in each
+ * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
+ * from <b>tail to head</b> operator in the chain.
+ */
private void tryDisposeAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
@@ -361,7 +410,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
}
-
+
+ /**
+ * Execute the operator-specific {@link StreamOperator#dispose()} method in each
+ * of the operators in the chain of this {@link StreamTask}. </b> Disposing happens
+ * from <b>tail to head</b> operator in the chain.
+ *
+ * The difference with the {@link #tryDisposeAllOperators()} is that in case of an
+ * exception, this method catches it and logs the message.
+ */
private void disposeAllOperators() {
if (operatorChain != null) {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
@@ -389,10 +446,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
- if (!timerService.isTerminated()) {
- LOG.warn("Timer service was not shut down. Shutting down in finalize().");
- }
- timerService.shutdownNow();
+ timerService.shutdownService();
}
for (Thread checkpointThread : asyncCheckpointThreads) {
@@ -418,8 +472,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
/**
- * Gets the lock object on which all operations that involve data and state mutation have to lock.
-
+ * Gets the lock object on which all operations that involve data and state mutation have to lock.
* @return The checkpoint lock object.
*/
public Object getCheckpointLock() {
@@ -503,10 +556,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
synchronized (lock) {
if (isRunning) {
- // since both state checkpointing and downstream barrier emission occurs in this
- // lock scope, they are an atomic operation regardless of the order in which they occur
- // we immediately emit the checkpoint barriers, so the downstream operators can start
- // their checkpoint work as soon as possible
+ // Since both state checkpointing and downstream barrier emission occurs in this
+ // lock scope, they are an atomic operation regardless of the order in which they occur.
+ // Given this, we immediately emit the checkpoint barriers, so the downstream operators
+ // can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
// now draw the state snapshot
@@ -689,18 +742,16 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
* Registers a timer.
*/
public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) {
- long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
-
- return timerService.schedule(
- new TriggerTask(this, lock, target, timestamp),
- delay,
- TimeUnit.MILLISECONDS);
+ if (timerService == null) {
+ throw new IllegalStateException("The timer service has not been initialized.");
+ }
+ return timerService.registerTimer(timestamp, new TriggerTask(this, lock, target, timestamp));
}
/**
* Check whether an exception was thrown in a Thread other than the main Thread. (For example
* in the processing-time trigger Thread). This will rethrow that exception in case on
- * occured.
+ * occurred.
*
* <p>This must be called in the main loop of {@code StreamTask} subclasses to ensure
* that we propagate failures.
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
new file mode 100644
index 0000000..2314deb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the
+ * processing time functionality.
+ * */
+public class TestTimeServiceProvider extends TimeServiceProvider {
+
+ private long currentTime = 0;
+
+ private Map<Long, List<Runnable>> registeredTasks = new HashMap<>();
+
+ public void setCurrentTime(long timestamp) {
+ this.currentTime = timestamp;
+
+ // decide which timers to fire and put them in a list
+ // we do not fire them here to be able to accommodate timers
+ // that register other timers. The latter would through an exception.
+
+ Iterator<Map.Entry<Long, List<Runnable>>> it = registeredTasks.entrySet().iterator();
+ List<Runnable> toRun = new ArrayList<>();
+ while (it.hasNext()) {
+ Map.Entry<Long, List<Runnable>> t = it.next();
+ if (t.getKey() <= this.currentTime) {
+ for (Runnable r: t.getValue()) {
+ toRun.add(r);
+ }
+ it.remove();
+ }
+ }
+
+ // now do the actual firing.
+ for (Runnable r: toRun) {
+ r.run();
+ }
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return currentTime;
+ }
+
+ @Override
+ public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
+ List<Runnable> tasks = registeredTasks.get(timestamp);
+ if (tasks == null) {
+ tasks = new ArrayList<>();
+ registeredTasks.put(timestamp, tasks);
+ }
+ tasks.add(target);
+ return null;
+ }
+
+ public int getNoOfRegisteredTimers() {
+ int count = 0;
+ for (List<Runnable> tasks: registeredTasks.values()) {
+ count += tasks.size();
+ }
+ return count;
+ }
+
+ @Override
+ public void shutdownService() throws Exception {
+ this.registeredTasks.clear();
+ this.registeredTasks = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
new file mode 100644
index 0000000..f3e4f78
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Defines the current processing time and handles all related actions,
+ * such as register timers for tasks to be executed in the future.
+ */
+public abstract class TimeServiceProvider {
+
+ /** Returns the current processing time. */
+ public abstract long getCurrentProcessingTime();
+
+ /** Registers a task to be executed when (processing) time is {@code timestamp}.
+ * @param timestamp
+ * when the task is to be executed (in processing time)
+ * @param target
+ * the task to be executed
+ * @return the result to be returned.
+ */
+ public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Runnable target);
+
+ /** Shuts down and clean up the timer service provider. */
+ public abstract void shutdownService() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 153b1fd..c9f204d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -48,6 +49,58 @@ import static org.junit.Assert.*;
public class StreamTaskTimerTest {
@Test
+ public void testCustomTimeServiceProvider() throws Throwable {
+ TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+ mapTask.setTimeService(tp);
+
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+
+ StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
+ streamConfig.setStreamOperator(mapOperator);
+
+ testHarness.invoke();
+
+ assertTrue(testHarness.getCurrentProcessingTime() == 0);
+
+ tp.setCurrentTime(11);
+ assertTrue(testHarness.getCurrentProcessingTime() == 11);
+
+ tp.setCurrentTime(15);
+ tp.setCurrentTime(16);
+ assertTrue(testHarness.getCurrentProcessingTime() == 16);
+
+ // register 2 tasks
+ mapTask.registerTimer(30, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+
+ }
+ });
+
+ mapTask.registerTimer(40, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+
+ }
+ });
+
+ assertEquals(2, tp.getNoOfRegisteredTimers());
+
+ tp.setCurrentTime(35);
+ assertEquals(1, tp.getNoOfRegisteredTimers());
+
+ tp.setCurrentTime(40);
+ assertEquals(0, tp.getNoOfRegisteredTimers());
+
+ tp.shutdownService();
+ }
+
+ @Test
public void testOpenCloseAndTimestamps() throws Exception {
final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
@@ -176,6 +229,8 @@ public class StreamTaskTimerTest {
public static class DummyMapFunction<T> implements MapFunction<T, T> {
@Override
- public T map(T value) { return value; }
+ public T map(T value) {
+ return value;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 9aaf683..4f3ff63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -38,13 +38,18 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
@@ -55,6 +60,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
@@ -68,6 +74,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class WindowOperatorTest {
@@ -592,6 +600,8 @@ public class WindowOperatorTest {
WindowedStream<String, String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
.keyBy(new KeySelector<String, String>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public String getKey(String value) throws Exception {
return value;
@@ -922,6 +932,193 @@ public class WindowOperatorTest {
}
@Test
+ public void testProcessingTimeTumblingWindows() throws Throwable {
+ final int WINDOW_SIZE = 3;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+ TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+ ProcessingTimeTrigger.create(), 0);
+
+ TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+ operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+ testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ long initialTime = 0L;
+
+ testHarness.open();
+
+ testTimeProvider.setCurrentTime(3);
+
+ // timestamp is ignored in processing time
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 7000));
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+
+ testTimeProvider.setCurrentTime(5000);
+
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 7000));
+
+ testTimeProvider.setCurrentTime(7000);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeSlidingWindows() throws Throwable {
+ final int WINDOW_SIZE = 3;
+ final int WINDOW_SLIDE = 1;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+ SlidingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+ ProcessingTimeTrigger.create(), 0);
+
+ TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+ operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+ testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
+
+ // timestamp is ignored in processing time
+ testTimeProvider.setCurrentTime(3);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+
+ testTimeProvider.setCurrentTime(1000);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
+
+ testTimeProvider.setCurrentTime(2000);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+
+ testTimeProvider.setCurrentTime(3000);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
+
+ testTimeProvider.setCurrentTime(7000);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 4999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeSessionWindows() throws Throwable {
+ final int WINDOW_GAP = 3;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
+ ProcessingTimeSessionWindows.withGap(Time.of(WINDOW_GAP, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+ ProcessingTimeTrigger.create(), 0);
+
+ TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+ operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
+
+ testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.open();
+
+ // timestamp is ignored in processing time
+ testTimeProvider.setCurrentTime(3);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1));//Long.MAX_VALUE));
+
+ testTimeProvider.setCurrentTime(1000);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
+
+ testTimeProvider.setCurrentTime(5000);
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
+
+ testTimeProvider.setCurrentTime(10000);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 7999));
+ expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 7999));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+
+ assertEquals(expectedOutput.size(), testHarness.getOutput().size());
+ for (Object elem : testHarness.getOutput()) {
+ if (elem instanceof StreamRecord) {
+ StreamRecord<Tuple2<String, Integer>> el = (StreamRecord<Tuple2<String, Integer>>) elem;
+ assertTrue(expectedOutput.contains(el));
+ }
+ }
+ testHarness.close();
+ }
+
+ @Test
public void testLateness() throws Exception {
final int WINDOW_SIZE = 2;
final long LATENESS = 500;
@@ -995,16 +1192,16 @@ public class WindowOperatorTest {
TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+ final WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator =
new WindowOperator<>(
windowAssigner,
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- stateDesc,
- new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
- EventTimeTrigger.create(),
- LATENESS);
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+ EventTimeTrigger.create(),
+ LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
@@ -1017,7 +1214,12 @@ public class WindowOperatorTest {
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
long timestamp = Long.MAX_VALUE - 1750;
- Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2", 1), timestamp);
+ Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2", 1), timestamp, new WindowAssignerContext() {
+ @Override
+ public long getCurrentProcessingTime() {
+ return operator.windowAssignerContext.getCurrentProcessingTime();
+ }
+ });
TimeWindow window = Iterables.getOnlyElement(windows);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), timestamp));
@@ -1883,7 +2085,7 @@ public class WindowOperatorTest {
@Override
@SuppressWarnings("unchecked")
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+ public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext ctx) {
if (element instanceof Tuple2) {
Tuple2<String, Integer> t2 = (Tuple2<String, Integer>) element;
if (t2.f1 == 33) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 4639321..00e95b9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,6 +113,13 @@ public class StreamTaskTestHarness<OUT> {
outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
}
+ public long getCurrentProcessingTime() {
+ if (!(task instanceof StreamTask)) {
+ System.currentTimeMillis();
+ }
+ return ((StreamTask) task).getCurrentProcessingTime();
+ }
+
/**
* This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/4b5a7890/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6e2e9f9..86137d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -69,6 +70,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final Object checkpointLock;
+ final TimeServiceProvider timeServiceProvider;
+
StreamTask<?, ?> mockTask;
/**
@@ -80,8 +83,13 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
this(operator, new ExecutionConfig());
}
-
+
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) {
+ this(operator, executionConfig, null);
+ }
+
+ public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig,
+ TimeServiceProvider testTimeProvider) {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<Object>();
this.config = new StreamConfig(new Configuration());
@@ -90,6 +98,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
mockTask = mock(StreamTask.class);
+ timeServiceProvider = testTimeProvider;
+
when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
when(mockTask.getConfiguration()).thenReturn(config);
@@ -116,29 +126,44 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public Void answer(InvocationOnMock invocation) throws Throwable {
final long execTime = (Long) invocation.getArguments()[0];
final Triggerable target = (Triggerable) invocation.getArguments()[1];
-
- Thread caller = new Thread() {
- @Override
- public void run() {
- final long delay = execTime - System.currentTimeMillis();
- if (delay > 0) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ignored) {}
- }
-
- synchronized (checkpointLock) {
- try {
- target.trigger(execTime);
- } catch (Exception ignored) {}
+
+ if (timeServiceProvider == null) {
+ Thread caller = new Thread() {
+ @Override
+ public void run() {
+ final long delay = execTime - mockTask.getCurrentProcessingTime();
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ synchronized (checkpointLock) {
+ try {
+ target.trigger(execTime);
+ } catch (Exception ignored) {
+ }
+ }
}
- }
- };
- caller.start();
-
+ };
+ caller.start();
+ } else {
+ timeServiceProvider.registerTimer(
+ execTime, new TriggerTask(checkpointLock, target, execTime));
+ }
return null;
}
}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+ doAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ return timeServiceProvider == null ?
+ System.currentTimeMillis() :
+ timeServiceProvider.getCurrentProcessingTime();
+ }
+ }).when(mockTask).getCurrentProcessingTime();
}
public Object getCheckpointLock() {
@@ -201,6 +226,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public void close() throws Exception {
operator.close();
operator.dispose();
+ if (timeServiceProvider != null) {
+ timeServiceProvider.shutdownService();
+ }
setupCalled = false;
}
@@ -243,4 +271,32 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
// ignore
}
}
+
+ private static final class TriggerTask implements Runnable {
+
+ private final Object lock;
+ private final Triggerable target;
+ private final long timestamp;
+
+ TriggerTask(final Object lock, Triggerable target, long timestamp) {
+ this.lock = lock;
+ this.target = target;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ try {
+ target.trigger(timestamp);
+ } catch (Throwable t) {
+ try {
+ throw t;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
}