You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/02 16:50:59 UTC
[1/3] flink git commit: [FLINK-2727] [streaming] Add a base class for
Message Queue Sources that acknowledge messages by ID.
Repository: flink
Updated Branches:
refs/heads/master d486cde76 -> 6e0e67d2e
[FLINK-2727] [streaming] Add a base class for Message Queue Sources that acknowledge messages by ID.
This closes #1163
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daeab849
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daeab849
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daeab849
Branch: refs/heads/master
Commit: daeab84985bb510351b04e08f83cdd6ce8075225
Parents: d486cde
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 22 13:23:56 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:07:37 2015 +0200
----------------------------------------------------------------------
.../runtime/util/DataOutputSerializer.java | 5 +
.../streaming/api/checkpoint/Checkpointed.java | 2 +-
.../source/MessageAcknowledingSourceBase.java | 173 ++++++++++++++++++
.../api/state/SerializedCheckpointData.java | 176 +++++++++++++++++++
4 files changed, 355 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 7f8105d..0e93544 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.Arrays;
/**
* A simple and efficient serializer for the {@link java.io.DataOutput} interface.
@@ -66,6 +67,10 @@ public class DataOutputSerializer implements DataOutputView {
public byte[] getByteArray() {
return buffer;
}
+
+ public byte[] getCopyOfBuffer() {
+ return Arrays.copyOf(buffer, position);
+ }
public void clear() {
this.position = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index cb49dba..ac1cbfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -61,5 +61,5 @@ public interface Checkpointed<T extends Serializable> {
*
* @param state The state to be restored.
*/
- void restoreState(T state);
+ void restoreState(T state) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
new file mode 100644
index 0000000..3817ede
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.state.SerializedCheckpointData;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for data sources that receive elements from a message queue and
+ * acknowledge them back by IDs.
+ * <p>
+ * The mechanism for this source assumes that messages are identified by a unique ID.
+ * When messages are taken from the message queue, the message must not be dropped immediately,
+ * but must be retained until acknowledged. Messages that are not acknowledged within a certain
+ * time interval will be served again (to a different connection, established by the recovered source).
+ * <p>
+ * Note that this source can give no guarantees about message order in teh case of failures,
+ * because messages that were retrieved but not yet acknowledged will be returned later again, after
+ * a set of messages that was not retrieved before the failure.
+ * <p>
+ * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
+ * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
+ * that it has been successfully processed throughout the topology and the updates to any state caused by
+ * that message are persistent.
+ * <p>
+ * All messages that are emitted and successfully processed by the streaming program will eventually be
+ * acknowledged. In corner cases, the source may acknowledge certain IDs multiple times, if a
+ * failure occurs while acknowledging.
+ * <p>
+ * A typical way to use this base in a source function is by implementing a run() method as follows:
+ * <pre>{@code
+ * public void run(SourceContext<Type> ctx) throws Exception {
+ * while (running) {
+ * Message msg = queue.retrieve();
+ * synchronized (ctx.getCheckpointLock()) {
+ * ctx.collect(msg.getMessageData());
+ * addId(msg.getMessageId());
+ * }
+ * }
+ * }
+ * }</pre>
+ *
+ * @param <Type> The type of the messages created by the source.
+ * @param <Id> The type of the IDs that are used for acknowledging elements.
+ */
+public abstract class MessageAcknowledingSourceBase<Type, Id> extends RichSourceFunction<Type>
+ implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
+
+ private static final long serialVersionUID = -8689291992192955579L;
+
+ /** Serializer used to serialize the IDs for checkpoints */
+ private final TypeSerializer<Id> idSerializer;
+
+ /** The list gathering the IDs of messages emitted during the current checkpoint */
+ private transient List<Id> idsForCurrentCheckpoint;
+
+ /** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
+ private transient ArrayDeque<Tuple2<Long, List<Id>>> pendingCheckpoints;
+
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
+ *
+ * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
+ */
+ protected MessageAcknowledingSourceBase(Class<Id> idClass) {
+ this(TypeExtractor.getForClass(idClass));
+ }
+
+ /**
+ * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
+ *
+ * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
+ */
+ protected MessageAcknowledingSourceBase(TypeInformation<Id> idTypeInfo) {
+ this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ idsForCurrentCheckpoint = new ArrayList<>(64);
+ pendingCheckpoints = new ArrayDeque<>();
+ }
+
+ @Override
+ public void close() throws Exception {
+ idsForCurrentCheckpoint.clear();
+ pendingCheckpoints.clear();
+ }
+
+
+ // ------------------------------------------------------------------------
+ // ID Checkpointing
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method must be implemented to acknowledge the given set of IDs back to the message queue.
+ * @param ids The list od IDs to acknowledge.
+ */
+ protected abstract void acknowledgeIDs(List<Id> ids);
+
+ /**
+ * Adds an ID to be stored with the current checkpoint.
+ * @param id The ID to add.
+ */
+ protected void addId(Id id) {
+ idsForCurrentCheckpoint.add(id);
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpointing the data
+ // ------------------------------------------------------------------------
+
+ @Override
+ public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ pendingCheckpoints.addLast(new Tuple2<Long, List<Id>>(checkpointId, idsForCurrentCheckpoint));
+ idsForCurrentCheckpoint = new ArrayList<>(64);
+
+ return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
+ }
+
+ @Override
+ public void restoreState(SerializedCheckpointData[] state) throws Exception {
+ pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
+ Tuple2<Long, List<Id>> checkpoint = iter.next();
+ long id = checkpoint.f0;
+
+ if (id <= checkpointId) {
+ acknowledgeIDs(checkpoint.f1);
+ iter.remove();
+ }
+ else {
+ break;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
new file mode 100644
index 0000000..2bbb4e2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class represents serialized checkpoint data for a collection of elements.
+ */
+public class SerializedCheckpointData implements java.io.Serializable {
+
+ private static final long serialVersionUID = -8783744683896503488L;
+
+ /** ID of the checkpoint for which the IDs are stored */
+ private final long checkpointId;
+
+ /** The serialized elements */
+ private final byte[] serializedData;
+
+ /** The number of elements in the checkpoint */
+ private final int numIds;
+
+ /**
+ * Creates a SerializedCheckpointData object for the given serialized data.
+ *
+ * @param checkpointId The checkpointId of the checkpoint.
+ * @param serializedData The serialized IDs in this checkpoint.
+ * @param numIds The number of IDs in the checkpoint.
+ */
+ public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
+ this.checkpointId = checkpointId;
+ this.serializedData = serializedData;
+ this.numIds = numIds;
+ }
+
+ /**
+ * Gets the checkpointId of the checkpoint.
+ * @return The checkpointId of the checkpoint.
+ */
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ /**
+ * Gets the binary data for the serialized elements.
+ * @return The binary data for the serialized elements.
+ */
+ public byte[] getSerializedData() {
+ return serializedData;
+ }
+
+ /**
+ * Gets the number of IDs in the checkpoint.
+ * @return The number of IDs in the checkpoint.
+ */
+ public int getNumIds() {
+ return numIds;
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialize to Checkpoint
+ // ------------------------------------------------------------------------
+
+ /**
+ * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
+ *
+ * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+ * @param serializer The serializer to serialize the IDs.
+ * @param <T> The type of the ID.
+ * @return An array of serializable SerializedCheckpointData, one per entry in the
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
+ TypeSerializer<T> serializer) throws IOException {
+ return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
+ }
+
+ /**
+ * Converts a list of checkpoints into an array of SerializedCheckpointData.
+ *
+ * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+ * @param serializer The serializer to serialize the IDs.
+ * @param outputBuffer The reusable serialization buffer.
+ * @param <T> The type of the ID.
+ * @return An array of serializable SerializedCheckpointData, one per entry in the
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
+ TypeSerializer<T> serializer,
+ DataOutputSerializer outputBuffer) throws IOException {
+ SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
+
+ int pos = 0;
+ for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
+ outputBuffer.clear();
+ List<T> checkpointIds = checkpoint.f1;
+
+ for (T id : checkpointIds) {
+ serializer.serialize(id, outputBuffer);
+ }
+
+ serializedCheckpoints[pos++] = new SerializedCheckpointData(
+ checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
+ }
+
+ return serializedCheckpoints;
+ }
+
+ // ------------------------------------------------------------------------
+ // De-Serialize from Checkpoint
+ // ------------------------------------------------------------------------
+
+ /**
+ * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
+ *
+ * @param data The data to be deserialized.
+ * @param serializer The serializer used to deserialize the data.
+ * @param <T> The type of the elements.
+ * @return An ArrayDeque of element checkpoints.
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
+ SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
+ {
+ ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length);
+ DataInputDeserializer deser = null;
+
+ for (SerializedCheckpointData checkpoint : data) {
+ byte[] serializedData = checkpoint.getSerializedData();
+ if (deser == null) {
+ deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
+ }
+ else {
+ deser.setBuffer(serializedData, 0, serializedData.length);
+ }
+
+ final List<T> ids = new ArrayList<>(checkpoint.getNumIds());
+ final int numIds = checkpoint.getNumIds();
+
+ for (int i = 0; i < numIds; i++) {
+ ids.add(serializer.deserialize(deser));
+ }
+
+ deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids));
+ }
+
+ return deque;
+ }
+}
[2/3] flink git commit: [hotfix] [streaming] Apply closure cleaner to
KeyedWindowFunction
Posted by se...@apache.org.
[hotfix] [streaming] Apply closure cleaner to KeyedWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73902011
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73902011
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73902011
Branch: refs/heads/master
Commit: 739020110251aa98773695163bb23b52c48f8987
Parents: daeab84
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 19:59:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:10:53 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/datastream/KeyedWindowDataStream.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73902011/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index ad7ca37..302a645 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -178,6 +178,9 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
+ // clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, KeyedWindowFunction.class, true, true, inType, null, false);
[3/3] flink git commit: [hotfix] [streaming] Handle rich functions
properly in aligned time windows
Posted by se...@apache.org.
[hotfix] [streaming] Handle rich functions properly in aligned time windows
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e0e67d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e0e67d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e0e67d2
Branch: refs/heads/master
Commit: 6e0e67d2e0d5180d6fba492e8ab9cc8fb18fdf68
Parents: 7390201
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 20:44:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:20:40 2015 +0200
----------------------------------------------------------------------
.../api/datastream/KeyedWindowDataStream.java | 5 ++-
.../api/operators/AbstractStreamOperator.java | 5 ++-
.../operators/AbstractUdfStreamOperator.java | 33 +++++++++++----
...ractAlignedProcessingTimeWindowOperator.java | 22 +++++++---
...ccumulatingProcessingTimeWindowOperator.java | 5 ++-
...AggregatingProcessingTimeWindowOperator.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 6 +--
.../runtime/operators/StreamTaskTimerTest.java | 19 +++++++--
...AlignedProcessingTimeWindowOperatorTest.java | 44 ++++++++++----------
...AlignedProcessingTimeWindowOperatorTest.java | 17 ++++----
10 files changed, 101 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 302a645..9d05b8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
@@ -248,7 +249,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
}
else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked")
- KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+ KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(), windowLength, windowSlide);
@@ -273,7 +274,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
}
else if (function instanceof KeyedWindowFunction) {
@SuppressWarnings("unchecked")
- KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+ KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
wf, input.getKeySelector(), windowLength, windowSlide);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 07d8312..77bd130 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,7 +24,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
/**
- * Base class for operators that do not contain a user-defined function.
+ * Base class for all stream operators.
+ *
+ * Operators that contain a user function should extend the class
+ * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
*
* @param <OUT> The output type of the operator
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index f679d5f..c0d71e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -41,29 +42,45 @@ import org.slf4j.LoggerFactory;
/**
* This is used as the base class for operators that have a user-defined
- * function.
+ * function. This class handles the opening and closing of the user-defined functions,
+ * as part of the operator life cycle.
*
* @param <OUT>
* The output type of the operator
* @param <F>
* The type of the user function
*/
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable>
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
-
+
+ /** the user function */
protected final F userFunction;
+ /** Flag to prevent duplicate function.close() calls in close() and dispose() */
private boolean functionsClosed = false;
+
public AbstractUdfStreamOperator(F userFunction) {
- this.userFunction = userFunction;
+ this.userFunction = Objects.requireNonNull(userFunction);
}
+ /**
+ * Gets the user function executed in this operator.
+ * @return The user function of this operator.
+ */
+ public F getUserFunction() {
+ return userFunction;
+ }
+
+ // ------------------------------------------------------------------------
+ // operator life cycle
+ // ------------------------------------------------------------------------
+
@Override
public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
super.setup(output, runtimeContext);
@@ -97,6 +114,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
}
+ // ------------------------------------------------------------------------
+ // checkpointing and recovery
+ // ------------------------------------------------------------------------
+
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
@@ -170,8 +191,4 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
}
}
-
- public F getUserFunction() {
- return userFunction;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index a81340f..4fcfb2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -33,7 +33,8 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function>
+ extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>, Triggerable {
private static final long serialVersionUID = 3245500864882459867L;
@@ -60,11 +61,13 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
private transient long nextSlideTime;
protected AbstractAlignedProcessingTimeWindowOperator(
- Function function,
+ F function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
{
+ super(function);
+
if (function == null || keySelector == null) {
throw new NullPointerException();
}
@@ -103,6 +106,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
out = new TimestampedCollector<>(output);
// create the panes that gather the elements per slide
@@ -119,6 +124,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
public void close() throws Exception {
+ super.close();
+
final long finalWindowTimestamp = nextEvaluationTime;
// early stop the triggering thread, so it does not attempt to return any more data
@@ -130,12 +137,17 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
public void dispose() {
+ super.dispose();
+
// acquire the lock during shutdown, to prevent trigger calls at the same time
// fail-safe stop of the triggering thread (in case of an error)
stopTriggers();
- // release the panes
- panes.dispose();
+ // release the panes. panes may still be null if dispose is called
+ // after open() failed
+ if (panes != null) {
+ panes.dispose();
+ }
}
private void stopTriggers() {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 8edb76f..ace3823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -21,17 +21,18 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>> {
private static final long serialVersionUID = 7305948082830843475L;
public AccumulatingProcessingTimeWindowOperator(
- KeyedWindowFunction<IN, OUT, KEY, Window> function,
+ KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 99457bb..cc38019 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
public class AggregatingProcessingTimeWindowOperator<KEY, IN>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {
private static final long serialVersionUID = 7305948082830843475L;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b35350..16b8f55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -86,6 +86,9 @@ import org.slf4j.LoggerFactory;
*/
public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
+ /** The thread group that holds all trigger timer threads */
+ public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
+
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
/**
@@ -104,9 +107,6 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
protected ClassLoader userClassLoader;
- /** The thread group that holds all trigger timer threads */
- public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
-
/** The executor service that */
private ScheduledExecutorService timerService;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 2aed041..67df3ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.operators;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -25,8 +26,10 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
import org.junit.Test;
import org.junit.runner.RunWith;
+
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -38,7 +41,8 @@ import static org.junit.Assert.*;
* Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
+@PrepareForTest(ResultPartitionWriter.class)
+@SuppressWarnings("serial")
public class StreamTaskTimerTest {
@Test
@@ -47,7 +51,8 @@ public class StreamTaskTimerTest {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<>(null);
+
+ StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
streamConfig.setStreamOperator(mapOperator);
testHarness.invoke();
@@ -77,12 +82,11 @@ public class StreamTaskTimerTest {
@Test
public void checkScheduledTimestampe() {
try {
-
final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<>(null);
+ StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
streamConfig.setStreamOperator(mapOperator);
testHarness.invoke();
@@ -162,4 +166,11 @@ public class StreamTaskTimerTest {
}
}
}
+
+ // ------------------------------------------------------------------------
+
+ public static class DummyMapFunction<T> implements MapFunction<T, T> {
+ @Override
+ public T map(T value) { return value; }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 4327e11..99a2e14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
import org.apache.flink.util.Collector;
+
import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -45,11 +45,11 @@ import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
- private final KeyedWindowFunction<String, String, String, Window> mockFunction = mock(KeyedWindowFunction.class);
+ private final KeyedWindowFunction<String, String, String, TimeWindow> mockFunction = mock(KeyedWindowFunction.class);
@SuppressWarnings("unchecked")
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -61,11 +61,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
};
- private final KeyedWindowFunction<Integer, Integer, Integer, Window> validatingIdentityFunction =
- new KeyedWindowFunction<Integer, Integer, Integer, Window>()
+ private final KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+ new KeyedWindowFunction<Integer, Integer, Integer, TimeWindow>()
{
@Override
- public void evaluate(Integer key, Window window, Iterable<Integer> values, Collector<Integer> out) {
+ public void evaluate(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
for (Integer val : values) {
assertEquals(key, val);
out.collect(val);
@@ -112,7 +112,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowSizeAndSlide() {
try {
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+ AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
assertEquals(5000, op.getWindowSize());
@@ -153,7 +153,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+ AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
op.setup(mockOut, mockContext);
@@ -199,7 +199,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(
validatingIdentityFunction, identitySelector, windowSize, windowSize);
@@ -240,7 +240,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
op.setup(out, mockContext);
@@ -299,9 +299,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(
@@ -321,7 +321,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
op.setup(out, mockContext);
@@ -374,9 +374,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(
@@ -396,7 +396,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
// tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
op.setup(out, mockContext);
@@ -438,7 +438,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// the operator has a window time that is so long that it will not fire in this test
final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
oneYear, oneYear);
@@ -472,11 +472,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
- KeyedWindowFunction<Integer, Integer, Integer, Window> failingFunction = new FailingFunction(100);
+ KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
// the operator has a window time that is so long that it will not fire in this test
final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+ AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(
failingFunction, identitySelector, hundredYears, hundredYears);
@@ -523,7 +523,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, Window> {
+ private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> {
private final int failAfterElements;
@@ -534,7 +534,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Override
- public void evaluate(Integer integer, Window window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ public void evaluate(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
for (Integer i : values) {
out.collect(i);
numElements++;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7ad9dd4..fa90e4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -51,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
@@ -113,7 +112,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowSizeAndSlide() {
try {
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+ AggregatingProcessingTimeWindowOperator<String, String> op;
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
assertEquals(5000, op.getWindowSize());
@@ -153,8 +152,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
-
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+
+ AggregatingProcessingTimeWindowOperator<String, String> op;
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
op.setup(mockOut, mockContext);
@@ -244,9 +243,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(
@@ -380,9 +379,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
when(mockContext.getTaskName()).thenReturn("Test task name");
final Object lock = new Object();
- doAnswer(new Answer() {
+ doAnswer(new Answer<Void>() {
@Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
final Long timestamp = (Long) invocationOnMock.getArguments()[0];
final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
timerService.schedule(