You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/02 16:51:01 UTC
[3/3] flink git commit: [hotfix] [streaming] Handle rich functions
properly in aligned time windows
[hotfix] [streaming] Handle rich functions properly in aligned time windows
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e0e67d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e0e67d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e0e67d2
Branch: refs/heads/master
Commit: 6e0e67d2e0d5180d6fba492e8ab9cc8fb18fdf68
Parents: 7390201
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 20:44:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:20:40 2015 +0200
----------------------------------------------------------------------
.../api/datastream/KeyedWindowDataStream.java | 5 ++-
.../api/operators/AbstractStreamOperator.java | 5 ++-
.../operators/AbstractUdfStreamOperator.java | 33 +++++++++++----
...ractAlignedProcessingTimeWindowOperator.java | 22 +++++++---
...ccumulatingProcessingTimeWindowOperator.java | 5 ++-
...AggregatingProcessingTimeWindowOperator.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 6 +--
.../runtime/operators/StreamTaskTimerTest.java | 19 +++++++--
...AlignedProcessingTimeWindowOperatorTest.java | 44 ++++++++++----------
...AlignedProcessingTimeWindowOperatorTest.java | 17 ++++----
10 files changed, 101 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 302a645..9d05b8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
@@ -248,7 +249,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
}
else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked")
- KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+ KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(), windowLength, windowSlide);
@@ -273,7 +274,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
}
else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked")
- KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+ KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(), windowLength, windowSlide);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 07d8312..77bd130 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,7 +24,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
/**
- * Base class for operators that do not contain a user-defined function.
+ * Base class for all stream operators.
+ *
+ * Operators that contain a user function should extend the class
+ * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
*
* @param <OUT> The output type of the operator
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index f679d5f..c0d71e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -41,29 +42,45 @@ import org.slf4j.LoggerFactory;
/**
* This is used as the base class for operators that have a user-defined
- * function.
+ * function. This class handles the opening and closing of the user-defined functions,
+ * as part of the operator life cycle.
*
* @param <OUT>
* The output type of the operator
* @param <F>
* The type of the user function
*/
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable>
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
-
+
+ /** the user function */
protected final F userFunction;
+ /** Flag to prevent duplicate function.close() calls in close() and dispose() */
private boolean functionsClosed = false;
+
public AbstractUdfStreamOperator(F userFunction) {
- this.userFunction = userFunction;
+ this.userFunction = Objects.requireNonNull(userFunction);
}
+ /**
+ * Gets the user function executed in this operator.
+ * @return The user function of this operator.
+ */
+ public F getUserFunction() {
+ return userFunction;
+ }
+
+ // ------------------------------------------------------------------------
+ // operator life cycle
+ // ------------------------------------------------------------------------
+
@Override
public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
super.setup(output, runtimeContext);
@@ -97,6 +114,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
}
+ // ------------------------------------------------------------------------
+ // checkpointing and recovery
+ // ------------------------------------------------------------------------
+
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
@@ -170,8 +191,4 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
}
}
-
- public F getUserFunction() {
- return userFunction;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index a81340f..4fcfb2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -33,7 +33,8 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function>
+ extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>, Triggerable {
private static final long serialVersionUID = 3245500864882459867L;
@@ -60,11 +61,13 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
private transient long nextSlideTime;
protected AbstractAlignedProcessingTimeWindowOperator(
- Function function,
+ F function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
{
+ super(function);
+
if (function == null || keySelector == null) {
throw new NullPointerException();
}
@@ -103,6 +106,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
out = new TimestampedCollector<>(output);
// create the panes that gather the elements per slide
@@ -119,6 +124,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
public void close() throws Exception {
+ super.close();
+
final long finalWindowTimestamp = nextEvaluationTime;
// early stop the triggering thread, so it does not attempt to return any more data
@@ -130,12 +137,17 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
public void dispose() {
+ super.dispose();
+
// acquire the lock during shutdown, to prevent trigger calls at the same time
// fail-safe stop of the triggering thread (in case of an error)
stopTriggers();
- // release the panes
- panes.dispose();
+ // release the panes. panes may still be null if dispose is called
+ // after open() failed
+ if (panes != null) {
+ panes.dispose();
+ }
}
private void stopTriggers() {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 8edb76f..ace3823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -21,17 +21,18 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>> {
private static final long serialVersionUID = 7305948082830843475L;
public AccumulatingProcessingTimeWindowOperator(
- KeyedWindowFunction<IN, OUT, KEY, Window> function,
+ KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 99457bb..cc38019 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
public class AggregatingProcessingTimeWindowOperator<KEY, IN>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {
private static final long serialVersionUID = 7305948082830843475L;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b35350..16b8f55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -86,6 +86,9 @@ import org.slf4j.LoggerFactory;
*/
public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
+ /** The thread group that holds all trigger timer threads */
+ public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
+
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
/**
@@ -104,9 +107,6 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected ClassLoader userClassLoader;
- /** The thread group that holds all trigger timer threads */
- public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
-
/** The executor service that */
private ScheduledExecutorService timerService;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 2aed041..67df3ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.operators;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -25,8 +26,10 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -38,7 +41,8 @@ import static org.junit.Assert.*;
* Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
+@PrepareForTest(ResultPartitionWriter.class)
+@SuppressWarnings("serial")
public class StreamTaskTimerTest {
@Test
@@ -47,7 +51,8 @@ public class StreamTaskTimerTest {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<>(null);
+
+ StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
streamConfig.setStreamOperator(mapOperator);
testHarness.invoke();
@@ -77,12 +82,11 @@ public class StreamTaskTimerTest {
@Test
public void checkScheduledTimestampe() {
try {
-
final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<>(null);
+ StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
streamConfig.setStreamOperator(mapOperator);
testHarness.invoke();
@@ -162,4 +166,11 @@ public class StreamTaskTimerTest {
}
}
}
+
+ // ------------------------------------------------------------------------
+
+ public static class DummyMapFunction<T> implements MapFunction<T, T> {
+ @Override
+ public T map(T value) { return value; }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 4327e11..99a2e14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
import org.apache.flink.util.Collector;
+
import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -45,11 +45,11 @@ import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
- private final KeyedWindowFunction<String, String, String, Window> mockFunction = mock(KeyedWindowFunction.class);
+ private final KeyedWindowFunction<String, String, String, TimeWindow> mockFunction = mock(KeyedWindowFunction.class);
@SuppressWarnings("unchecked")
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -61,11 +61,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
};
- private final KeyedWindowFunction<Integer, Integer, Integer, Window> validatingIdentityFunction =
- new KeyedWindowFunction<Integer, Integer, Integer, Window>()
+ private final KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+ new KeyedWindowFunction<Integer, Integer, Integer, TimeWindow>()
{
@Override
- public void evaluate(Integer key, Window window, Iterable<Integer> values, Collector<Integer> out) {
+ public void evaluate(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
for (Integer val : values) {
assertEquals(key, val);
out.collect(val);
@@ -112,7 +112,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowSizeAndSlide() {
try {
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+ AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
assertEquals(5000, op.getWindowSize());
@@ -153,7 +153,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+ AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
op.setup(mockOut, mockContext);
@@ -199,7 +199,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(
validatingIdentityFunction, identitySelector, windowSize, windowSize);
@@ -240,7 +240,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
op.setup(out, mockContext);
@@ -299,9 +299,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(
@@ -321,7 +321,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
op.setup(out, mockContext);
@@ -374,9 +374,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(
@@ -396,7 +396,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
op.setup(out, mockContext);
@@ -438,7 +438,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// the operator has a window time that is so long that it will not fire in this test
final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
oneYear, oneYear);
@@ -472,11 +472,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
- KeyedWindowFunction<Integer, Integer, Integer, Window> failingFunction = new FailingFunction(100);
+ KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
// the operator has a window time that is so long that it will not fire in this test
final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(
failingFunction, identitySelector, hundredYears, hundredYears);
@@ -523,7 +523,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, Window> {
+ private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> {
private final int failAfterElements;
@@ -534,7 +534,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Override
- public void evaluate(Integer integer, Window window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ public void evaluate(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
for (Integer i : values) {
out.collect(i);
numElements++;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7ad9dd4..fa90e4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -51,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
@@ -113,7 +112,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowSizeAndSlide() {
try {
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+ AggregatingProcessingTimeWindowOperator<String, String> op;
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
assertEquals(5000, op.getWindowSize());
@@ -153,8 +152,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
-
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+
+ AggregatingProcessingTimeWindowOperator<String, String> op;
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
op.setup(mockOut, mockContext);
@@ -244,9 +243,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(
@@ -380,9 +379,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(