You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:40 UTC
[13/24] flink git commit: [FLINK-2550] [streaming] Make fast-path
processing time windows fault tolerant
[FLINK-2550] [streaming] Make fast-path processing time windows fault tolerant
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c24dca50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c24dca50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c24dca50
Branch: refs/heads/master
Commit: c24dca501e1b0e1dcdc38d2e81e0a182bc2ae6bb
Parents: 479bec0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 8 21:38:39 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200
----------------------------------------------------------------------
.../core/memory/DataInputViewStreamWrapper.java | 38 +++
.../memory/DataOutputViewStreamWrapper.java | 59 ++++
.../OutputViewDataOutputStreamWrapper.java | 5 +-
.../OutputViewObjectOutputStreamWrapper.java | 7 +-
.../streaming/api/datastream/KeyedStream.java | 31 +-
.../api/datastream/WindowedStream.java | 21 +-
.../flink/streaming/api/graph/StreamConfig.java | 2 +-
.../flink/streaming/api/graph/StreamGraph.java | 4 +-
.../api/operators/AbstractStreamOperator.java | 12 +-
.../flink/streaming/api/state/StateBackend.java | 81 ++++-
.../api/state/filesystem/FsStateBackend.java | 3 +
.../api/state/memory/MemoryStateBackend.java | 3 +
...ractAlignedProcessingTimeWindowOperator.java | 126 +++++++-
.../windowing/AbstractKeyedTimePanes.java | 82 ++++-
.../windowing/AccumulatingKeyedTimePanes.java | 6 +-
...ccumulatingProcessingTimeWindowOperator.java | 117 ++++++-
.../windowing/AggregatingKeyedTimePanes.java | 6 +-
...AggregatingProcessingTimeWindowOperator.java | 7 +-
.../streaming/runtime/tasks/StreamTask.java | 25 +-
.../streaming/api/AggregationFunctionTest.java | 95 ++++--
.../streaming/api/graph/SlotAllocationTest.java | 21 +-
.../api/operators/StreamGroupedFoldTest.java | 30 +-
.../api/operators/StreamGroupedReduceTest.java | 2 +
...AlignedProcessingTimeWindowOperatorTest.java | 313 ++++++++++++++++--
...AlignedProcessingTimeWindowOperatorTest.java | 321 +++++++++++++++++--
.../flink/streaming/util/MockContext.java | 34 +-
.../util/OneInputStreamOperatorTestHarness.java | 25 +-
27 files changed, 1271 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
new file mode 100644
index 0000000..80affea
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class DataInputViewStreamWrapper extends DataInputStream implements DataInputView {
+
+ public DataInputViewStreamWrapper(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ if (skipBytes(numBytes) != numBytes){
+ throw new EOFException("Could not skip " + numBytes + " bytes.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
new file mode 100644
index 0000000..efcc17e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class DataOutputViewStreamWrapper extends DataOutputStream implements DataOutputView {
+
+ private byte[] tempBuffer;
+
+ public DataOutputViewStreamWrapper(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ if (tempBuffer == null) {
+ tempBuffer = new byte[4096];
+ }
+
+ while (numBytes > 0) {
+ int toWrite = Math.min(numBytes, tempBuffer.length);
+ write(tempBuffer, 0, toWrite);
+ numBytes -= toWrite;
+ }
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ if (tempBuffer == null) {
+ tempBuffer = new byte[4096];
+ }
+
+ while (numBytes > 0) {
+ int toCopy = Math.min(numBytes, tempBuffer.length);
+ source.read(tempBuffer, 0, toCopy);
+ write(tempBuffer, 0, toCopy);
+ numBytes -= toCopy;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
index ffe36c0..3be5d8b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
@@ -21,7 +21,6 @@ package org.apache.flink.core.memory;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Arrays;
public class OutputViewDataOutputStreamWrapper implements DataOutputView, Closeable {
@@ -43,9 +42,7 @@ public class OutputViewDataOutputStreamWrapper implements DataOutputView, Closea
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
- byte[] bytes = new byte[numBytes];
- Arrays.fill(bytes, (byte)0);
- out.write(bytes);
+ out.write(new byte[numBytes]);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
index b84e07e..49cc3a7 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
@@ -20,9 +20,9 @@ package org.apache.flink.core.memory;
import java.io.IOException;
import java.io.ObjectOutputStream;
-import java.util.Arrays;
public class OutputViewObjectOutputStreamWrapper implements DataOutputView {
+
private final ObjectOutputStream out;
public OutputViewObjectOutputStreamWrapper(ObjectOutputStream out){
@@ -31,10 +31,7 @@ public class OutputViewObjectOutputStreamWrapper implements DataOutputView {
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- Arrays.fill(buffer, (byte) 0);
-
- out.write(buffer);
+ out.write(new byte[numBytes]);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index cdea910..d4a3a77 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -60,10 +60,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
* @param <KEY> The type of the key in the Keyed Stream.
*/
public class KeyedStream<T, KEY> extends DataStream<T> {
-
- protected final KeySelector<T, KEY> keySelector;
- protected final TypeInformation<KEY> keyType;
+ /** The key selector that can get the key by which the stream if partitioned from the elements */
+ private final KeySelector<T, KEY> keySelector;
+
+ /** The type of the key by which the stream is partitioned */
+ private final TypeInformation<KEY> keyType;
/**
* Creates a new {@link KeyedStream} using the given {@link KeySelector}
@@ -93,18 +95,35 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
this.keySelector = keySelector;
this.keyType = keyType;
}
-
+ // ------------------------------------------------------------------------
+ // properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the key selector that can get the key by which the stream if partitioned from the elements.
+ * @return The key selector for the key.
+ */
public KeySelector<T, KEY> getKeySelector() {
return this.keySelector;
}
-
+ /**
+ * Gets the type of the key by which the stream is partitioned.
+ * @return The type of the key by which the stream is partitioned.
+ */
+ public TypeInformation<KEY> getKeyType() {
+ return keyType;
+ }
+
@Override
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
}
+ // ------------------------------------------------------------------------
+ // basic transformations
+ // ------------------------------------------------------------------------
@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
@@ -119,8 +138,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
return returnStream;
}
-
-
@Override
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1b511d8..f1220de 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -503,7 +503,10 @@ public class WindowedStream<T, K, W extends Window> {
@SuppressWarnings("unchecked")
OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(), windowLength, windowSlide);
+ reducer, input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
else if (function instanceof WindowFunction) {
@@ -511,7 +514,10 @@ public class WindowedStream<T, K, W extends Window> {
WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(), windowLength, windowSlide);
+ wf, input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
} else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
@@ -528,7 +534,11 @@ public class WindowedStream<T, K, W extends Window> {
@SuppressWarnings("unchecked")
OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(), windowLength, windowSlide);
+ reducer,
+ input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
else if (function instanceof WindowFunction) {
@@ -536,7 +546,10 @@ public class WindowedStream<T, K, W extends Window> {
WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(), windowLength, windowSlide);
+ wf, input.getKeySelector(),
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+ windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 55afc93..76be598 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -386,7 +386,7 @@ public class StreamConfig implements Serializable {
}
}
- public void setStatePartitioner(KeySelector<?, Serializable> partitioner) {
+ public void setStatePartitioner(KeySelector<?, ?> partitioner) {
try {
InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4c5c19c..0652406 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -64,8 +64,6 @@ import org.apache.sling.commons.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Objects.requireNonNull;
-
/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
@@ -149,7 +147,7 @@ public class StreamGraph extends StreamingPlan {
}
public void setStateBackend(StateBackend<?> backend) {
- this.stateBackend = requireNonNull(backend);
+ this.stateBackend = backend;
}
public StateBackend<?> getStateBackend() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index e99d54d..ca86627 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -144,13 +144,9 @@ public abstract class AbstractStreamOperator<OUT>
@Override
public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
- // here, we deal with operator checkpoints and key/value state snapshots
+ // here, we deal with key/value state snapshots
StreamTaskState state = new StreamTaskState();
-
- // (1) checkpoint the operator, if the operator is stateful
-
- // (2) draw a snapshot of the key/value state
if (keyValueState != null) {
KvStateSnapshot<?, ?, ?> snapshot = keyValueState.shapshot(checkpointId, timestamp);
state.setKvState(snapshot);
@@ -161,10 +157,8 @@ public abstract class AbstractStreamOperator<OUT>
@Override
public void restoreState(StreamTaskState state) throws Exception {
- // (1) checkpoint the operator, if the operator is stateful
-
- // (2) restore the key/value state. the actual restore happens lazily, when the function requests
- // the state again, because the restore method needs information provided by the user function
+ // restore the key/value state. the actual restore happens lazily, when the function requests
+ // the state again, because the restore method needs information provided by the user function
keyValueStateSnapshot = state.getKvState();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
index b4fce7e..f4391ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StateBackend.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.api.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateHandle;
import java.io.IOException;
@@ -57,6 +61,14 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
* @throws Exception Exceptions may occur during disposal of the state and should be forwarded.
*/
public abstract void disposeAllStateForCurrentJob() throws Exception;
+
+ /**
+ * Closes the state backend, releasing all internal resources, but does not delete any persistent
+ * checkpoint data.
+ *
+ * @throws Exception Exceptions can be forwarded and will be logged by the system
+ */
+ public abstract void close() throws Exception;
// ------------------------------------------------------------------------
// key/value state
@@ -96,7 +108,21 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
*/
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;
-
+
+ /**
+ * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
+ * When the stream is closes, it returns a state handle that can retrieve the state back.
+ *
+ * @param checkpointID The ID of the checkpoint.
+ * @param timestamp The timestamp of the checkpoint.
+ * @return An DataOutputView stream that writes state for the given checkpoint.
+ *
+ * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
+ */
+ public CheckpointStateOutputView createCheckpointStateOutputView(
+ long checkpointID, long timestamp) throws Exception {
+ return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
+ }
/**
* Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
@@ -132,4 +158,57 @@ public abstract class StateBackend<Backend extends StateBackend<Backend>> implem
*/
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
}
+
+ /**
+ * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
+ */
+ public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
+
+ private final CheckpointStateOutputStream out;
+
+ public CheckpointStateOutputView(CheckpointStateOutputStream out) {
+ super(out);
+ this.out = out;
+ }
+
+ /**
+ * Closes the stream and gets a state handle that can create a DataInputView.
+ * producing the data written to this stream.
+ *
+ * @return A state handle that can create an input stream producing the data written to this stream.
+ * @throws IOException Thrown, if the stream cannot be closed.
+ */
+ public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
+ return new DataInputViewHandle(out.closeAndGetHandle());
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+ }
+
+ /**
+ * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
+ */
+ private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+
+ private static final long serialVersionUID = 2891559813513532079L;
+
+ private final StreamStateHandle stream;
+
+ private DataInputViewHandle(StreamStateHandle stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
+ return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ stream.discardState();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
index 1fc2457..3cbd227 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/filesystem/FsStateBackend.java
@@ -231,6 +231,9 @@ public class FsStateBackend extends StateBackend<FsStateBackend> {
}
}
+ @Override
+ public void close() throws Exception {}
+
// ------------------------------------------------------------------------
// state backend operations
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
index b2dfae8..05368bd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/memory/MemoryStateBackend.java
@@ -75,6 +75,9 @@ public class MemoryStateBackend extends StateBackend<MemoryStateBackend> {
// nothing to do here, GC will do it
}
+ @Override
+ public void close() throws Exception {}
+
// ------------------------------------------------------------------------
// State backend operations
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 227de49..cf8575e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -21,18 +21,24 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.state.StateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import static java.util.Objects.requireNonNull;
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function>
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function>
extends AbstractUdfStreamOperator<OUT, F>
implements OneInputStreamOperator<IN, OUT>, Triggerable {
@@ -45,6 +51,9 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
private final Function function;
private final KeySelector<IN, KEY> keySelector;
+ private final TypeSerializer<KEY> keySerializer;
+ private final TypeSerializer<STATE> stateTypeSerializer;
+
private final long windowSize;
private final long windowSlide;
private final long paneSize;
@@ -52,24 +61,25 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
// ----- fields for operator functionality -----
- private transient AbstractKeyedTimePanes<IN, KEY, ?, OUT> panes;
+ private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
private transient TimestampedCollector<OUT> out;
+ private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
+
private transient long nextEvaluationTime;
private transient long nextSlideTime;
protected AbstractAlignedProcessingTimeWindowOperator(
F function,
KeySelector<IN, KEY> keySelector,
+ TypeSerializer<KEY> keySerializer,
+ TypeSerializer<STATE> stateTypeSerializer,
long windowLength,
long windowSlide)
{
super(function);
- if (function == null || keySelector == null) {
- throw new NullPointerException();
- }
if (windowLength < MIN_SLIDE_TIME) {
throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
}
@@ -87,8 +97,10 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
"The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
}
- this.function = function;
- this.keySelector = keySelector;
+ this.function = requireNonNull(function);
+ this.keySelector = requireNonNull(keySelector);
+ this.keySerializer = requireNonNull(keySerializer);
+ this.stateTypeSerializer = requireNonNull(stateTypeSerializer);
this.windowSize = windowLength;
this.windowSlide = windowSlide;
this.paneSize = paneSlide;
@@ -96,7 +108,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
- protected abstract AbstractKeyedTimePanes<IN, KEY, ?, OUT> createPanes(
+ protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(
KeySelector<IN, KEY> keySelector, Function function);
// ------------------------------------------------------------------------
@@ -106,19 +118,53 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
@Override
public void open() throws Exception {
super.open();
-
+
out = new TimestampedCollector<>(output);
- // create the panes that gather the elements per slide
- panes = createPanes(keySelector, function);
-
// decide when to first compute the window and when to slide it
// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
final long now = System.currentTimeMillis();
nextEvaluationTime = now + windowSlide - (now % windowSlide);
nextSlideTime = now + paneSize - (now % paneSize);
+
+ final long firstTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
+
+ // check if we restored state and if we need to fire some windows based on that restored state
+ if (restoredState == null) {
+ // initial empty state: create empty panes that gather the elements per slide
+ panes = createPanes(keySelector, function);
+ }
+ else {
+ // restored state
+ panes = restoredState.panes;
+
+ long nextPastEvaluationTime = restoredState.nextEvaluationTime;
+ long nextPastSlideTime = restoredState.nextSlideTime;
+ long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
+ int numPanesRestored = panes.getNumPanes();
+
+ // fire windows from the past as long as there are more panes with data and as long
+ // as the missed trigger times have not caught up with the presence
+ while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) {
+ // evaluate the window from the past
+ if (nextPastTriggerTime == nextPastEvaluationTime) {
+ computeWindow(nextPastTriggerTime);
+ nextPastEvaluationTime += windowSlide;
+ }
+
+ // evaluate slide from the past
+ if (nextPastTriggerTime == nextPastSlideTime) {
+ panes.slidePanes(numPanesPerWindow);
+ numPanesRestored--;
+ nextPastSlideTime += paneSize;
+ }
+
+ nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
+ }
+ }
- registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
+ // make sure the first window happens
+ registerTimer(firstTriggerTime, this);
}
@Override
@@ -197,6 +243,44 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
// ------------------------------------------------------------------------
+ // Checkpointing
+ // ------------------------------------------------------------------------
+
+ @Override
+ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+ StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+ // we write the panes with the key/value maps into the stream, as well as when this state
+ // should have triggered and slided
+ StateBackend.CheckpointStateOutputView out =
+ getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+ out.writeLong(nextEvaluationTime);
+ out.writeLong(nextSlideTime);
+ panes.writeToOutput(out, keySerializer, stateTypeSerializer);
+
+ taskState.setOperatorState(out.closeAndGetHandle());
+ return taskState;
+ }
+
+ @Override
+ public void restoreState(StreamTaskState taskState) throws Exception {
+ super.restoreState(taskState);
+
+ @SuppressWarnings("unchecked")
+ StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+ DataInputView in = inputState.getState(getUserCodeClassloader());
+
+ final long nextEvaluationTime = in.readLong();
+ final long nextSlideTime = in.readLong();
+
+ AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = createPanes(keySelector, function);
+ panes.readFromInput(in, keySerializer, stateTypeSerializer);
+
+ restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime);
+ }
+
+ // ------------------------------------------------------------------------
// Property access (for testing)
// ------------------------------------------------------------------------
@@ -232,4 +316,20 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
public String toString() {
return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
}
+
+ // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+
+ private static final class RestoredState<IN, KEY, STATE, OUT> {
+
+ final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
+ final long nextEvaluationTime;
+ final long nextSlideTime;
+
+ RestoredState(AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes, long nextEvaluationTime, long nextSlideTime) {
+ this.panes = panes;
+ this.nextEvaluationTime = nextEvaluationTime;
+ this.nextSlideTime = nextSlideTime;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
index 07dea06..d1cea20 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -18,16 +18,29 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
+import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Iterator;
-
+/**
+ * Base class for a multiple key/value maps organized in panes.
+ */
public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
+ private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
+
+ private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
+
+ /** The latest time pane */
protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
+ /** The previous time panes, ordered by time (early to late) */
protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
// ------------------------------------------------------------------------
@@ -43,6 +56,10 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
previousPanes.clear();
}
+ public int getNumPanes() {
+ return previousPanes.size() + 1;
+ }
+
public void slidePanes(int panesToKeep) {
if (panesToKeep > 1) {
@@ -74,4 +91,67 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
// let the maps make a coordinated traversal and evaluate the window function per contained key
KeyMap.traverseMaps(panes, traversal, traversalPass);
}
+
+ // ------------------------------------------------------------------------
+ // Serialization and de-serialization
+ // ------------------------------------------------------------------------
+
+ public void writeToOutput(
+ final DataOutputView output,
+ final TypeSerializer<Key> keySerializer,
+ final TypeSerializer<Aggregate> aggSerializer) throws IOException
+ {
+ output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
+
+ int numPanes = getNumPanes();
+ output.writeInt(numPanes);
+
+ // write from the past
+ Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
+ for (int paneNum = 0; paneNum < numPanes; paneNum++) {
+ output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
+ KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
+
+ output.writeInt(pane.size());
+ for (KeyMap.Entry<Key, Aggregate> entry : pane) {
+ keySerializer.serialize(entry.getKey(), output);
+ aggSerializer.serialize(entry.getValue(), output);
+ }
+ }
+ }
+
+ public void readFromInput(
+ final DataInputView input,
+ final TypeSerializer<Key> keySerializer,
+ final TypeSerializer<Aggregate> aggSerializer) throws IOException
+ {
+ validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
+ int numPanes = input.readInt();
+
+ // read from the past towards the presence
+ while (numPanes > 0) {
+ validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
+ KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
+
+ final int numElementsInPane = input.readInt();
+ for (int i = numElementsInPane - 1; i >= 0; i--) {
+ Key k = keySerializer.deserialize(input);
+ Aggregate a = aggSerializer.deserialize(input);
+ pane.put(k, a);
+ }
+
+ if (numPanes > 1) {
+ previousPanes.addLast(pane);
+ }
+ numPanes--;
+ }
+ }
+
+ private static void validateMagicNumber(int expected, int found) throws IOException {
+ if (expected != found) {
+ throw new IOException("Corrupt state stream - wrong magic number. " +
+ "Expected '" + Integer.toHexString(expected) +
+ "', found '" + Integer.toHexString(found) + '\'');
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 55c1be0..c854e6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -35,8 +35,10 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
private final WindowFunction<Type, Result, Key, Window> function;
-
- private long evaluationPass;
+
+ /**
+ * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
+ private long evaluationPass = 1L;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 3bcffbc..7a7d04c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -19,14 +19,20 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import java.io.IOException;
+import java.util.ArrayList;
+
public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, WindowFunction<IN, OUT, KEY, TimeWindow>> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
private static final long serialVersionUID = 7305948082830843475L;
@@ -34,10 +40,13 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
public AccumulatingProcessingTimeWindowOperator(
WindowFunction<IN, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector,
+ TypeSerializer<KEY> keySerializer,
+ TypeSerializer<IN> valueSerializer,
long windowLength,
long windowSlide)
{
- super(function, keySelector, windowLength, windowSlide);
+ super(function, keySelector, keySerializer,
+ new ArrayListSerializer<IN>(valueSerializer), windowLength, windowSlide);
}
@Override
@@ -47,4 +56,108 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
+
+ // ------------------------------------------------------------------------
+ // Utility Serializer for Lists of Elements
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private static final class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
+
+ private static final long serialVersionUID = 1119562170939152304L;
+
+ private final TypeSerializer<T> elementSerializer;
+
+ ArrayListSerializer(TypeSerializer<T> elementSerializer) {
+ this.elementSerializer = elementSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<ArrayList<T>> duplicate() {
+ TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
+ return duplicateElement == elementSerializer ? this : new ArrayListSerializer<T>(duplicateElement);
+ }
+
+ @Override
+ public ArrayList<T> createInstance() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public ArrayList<T> copy(ArrayList<T> from) {
+ ArrayList<T> newList = new ArrayList<>(from.size());
+ for (int i = 0; i < from.size(); i++) {
+ newList.add(elementSerializer.copy(from.get(i)));
+ }
+ return newList;
+ }
+
+ @Override
+ public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1; // var length
+ }
+
+ @Override
+ public void serialize(ArrayList<T> list, DataOutputView target) throws IOException {
+ final int size = list.size();
+ target.writeInt(size);
+ for (int i = 0; i < size; i++) {
+ elementSerializer.serialize(list.get(i), target);
+ }
+ }
+
+ @Override
+ public ArrayList<T> deserialize(DataInputView source) throws IOException {
+ final int size = source.readInt();
+ final ArrayList<T> list = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ list.add(elementSerializer.deserialize(source));
+ }
+ return list;
+ }
+
+ @Override
+ public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ // copy number of elements
+ final int num = source.readInt();
+ target.writeInt(num);
+ for (int i = 0; i < num; i++) {
+ elementSerializer.copy(source, target);
+ }
+ }
+
+ // --------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return elementSerializer.hashCode();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index c17f0b4..d395b2a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -29,8 +29,10 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
private final KeySelector<Type, Key> keySelector;
private final ReduceFunction<Type> reducer;
-
- private long evaluationPass;
+
+ /**
+ * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
+ private long evaluationPass = 1L;
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index cc38019..0e07cea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
public class AggregatingProcessingTimeWindowOperator<KEY, IN>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
private static final long serialVersionUID = 7305948082830843475L;
@@ -31,10 +32,12 @@ public class AggregatingProcessingTimeWindowOperator<KEY, IN>
public AggregatingProcessingTimeWindowOperator(
ReduceFunction<IN> function,
KeySelector<IN, KEY> keySelector,
+ TypeSerializer<KEY> keySerializer,
+ TypeSerializer<IN> aggregateSerializer,
long windowLength,
long windowSlide)
{
- super(function, keySelector, windowLength, windowSlide);
+ super(function, keySelector, keySerializer, aggregateSerializer, windowLength, windowSlide);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bbfd233..5bf7d8e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -188,7 +188,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
boolean disposed = false;
try {
- openAllOperators();
+ // we need to make sure that any triggers scheduled in open() cannot be
+ // executed before all operators are opened
+ synchronized (lock) {
+ openAllOperators();
+ }
// let the task do its work
isRunning = true;
@@ -202,12 +206,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// make sure no further checkpoint and notification actions happen.
// we make sure that no other thread is currently in the locked scope before
// we close the operators by trying to acquire the checkpoint scope lock
- synchronized (lock) {}
-
- // this is part of the main logic, so if this fails, the task is considered failed
- closeAllOperators();
+ // we also need to make sure that no triggers fire concurrently with the close logic
+ synchronized (lock) {
+ // this is part of the main logic, so if this fails, the task is considered failed
+ closeAllOperators();
+ }
- // make sure all data is flushed
+ // make sure all buffered data is flushed
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
@@ -239,6 +244,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
if (!disposed) {
disposeAllOperators();
}
+
+ try {
+ if (stateBackend != null) {
+ stateBackend.close();
+ }
+ } catch (Throwable t) {
+ LOG.error("Error while closing the state backend", t);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index aeb5078..dd8dec9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -24,11 +24,13 @@ import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableList;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -38,6 +40,7 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
import org.junit.Test;
public class AggregationFunctionTest {
@@ -78,9 +81,10 @@ public class AggregationFunctionTest {
ExecutionConfig config = new ExecutionConfig();
- KeySelector<Tuple2<Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
typeInfo, config);
+ TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
@@ -90,17 +94,20 @@ public class AggregationFunctionTest {
ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
1, typeInfo, AggregationType.MAX, config);
- List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
+ List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
- getInputList());
+ getInputList(),
+ keySelector, keyType);
- List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
+ List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
- getInputList());
+ getInputList(),
+ keySelector, keyType);
- List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
+ List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
- getInputList());
+ getInputList(),
+ keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
@@ -143,9 +150,10 @@ public class AggregationFunctionTest {
ExecutionConfig config = new ExecutionConfig();
- KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
+ TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
@@ -154,15 +162,20 @@ public class AggregationFunctionTest {
ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
false, config);
- List<MyPojo> groupedSumList = MockContext.createAndExecute(
+ List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
- getInputPojoList());
- List<MyPojo> groupedMinList = MockContext.createAndExecute(
+ getInputPojoList(),
+ keySelector, keyType);
+
+ List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
- getInputPojoList());
- List<MyPojo> groupedMaxList = MockContext.createAndExecute(
+ getInputPojoList(),
+ keySelector, keyType);
+
+ List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
- getInputPojoList());
+ getInputPojoList(),
+ keySelector, keyType);
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
@@ -200,9 +213,10 @@ public class AggregationFunctionTest {
ExecutionConfig config = new ExecutionConfig();
- KeySelector<Tuple3<Integer, Integer, Integer>, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ KeySelector<Tuple3<Integer, Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
typeInfo, config);
+ TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
@@ -214,18 +228,25 @@ public class AggregationFunctionTest {
ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
- assertEquals(maxByFirstExpected, MockContext.createAndExecute(
+ assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByList()));
- assertEquals(maxByLastExpected, MockContext.createAndExecute(
+ getInputByList(),
+ keySelector, keyType));
+
+ assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
- getInputByList()));
- assertEquals(minByLastExpected, MockContext.createAndExecute(
+ getInputByList(),
+ keySelector, keyType));
+
+ assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
- getInputByList()));
- assertEquals(minByFirstExpected, MockContext.createAndExecute(
+ getInputByList(),
+ keySelector, keyType));
+
+ assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByList()));
+ getInputByList(),
+ keySelector, keyType));
}
@Test
@@ -258,9 +279,10 @@ public class AggregationFunctionTest {
ExecutionConfig config = new ExecutionConfig();
- KeySelector<MyPojo3, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
+ KeySelector<MyPojo3, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
typeInfo, config);
+ TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
// aggregations tested
ReduceFunction<MyPojo3> maxByFunctionFirst =
@@ -272,18 +294,25 @@ public class AggregationFunctionTest {
ReduceFunction<MyPojo3> minByFunctionLast =
new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
- assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByPojoList()));
- assertEquals(maxByLastExpected, MockContext.createAndExecute(
+ assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
+ new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
+ getInputByPojoList(),
+ keySelector, keyType));
+
+ assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
- getInputByPojoList()));
- assertEquals(minByLastExpected, MockContext.createAndExecute(
+ getInputByPojoList(),
+ keySelector, keyType));
+
+ assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
- getInputByPojoList()));
- assertEquals(minByFirstExpected, MockContext.createAndExecute(
+ getInputByPojoList(),
+ keySelector, keyType));
+
+ assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
- getInputByPojoList()));
+ getInputByPojoList(),
+ keySelector, keyType));
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
index 39a13b3..8038cfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -25,23 +25,19 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Test;
-public class SlotAllocationTest extends StreamingMultipleProgramsTestBase{
+import org.junit.Test;
- @SuppressWarnings("serial")
+@SuppressWarnings("serial")
+public class SlotAllocationTest {
+
@Test
public void test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
-
@Override
- public boolean filter(Long value) throws Exception {
-
- return false;
- }
+ public boolean filter(Long value) { return false; }
};
env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter)
@@ -53,11 +49,8 @@ public class SlotAllocationTest extends StreamingMultipleProgramsTestBase{
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
- assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1)
- .getSlotSharingGroup());
- assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3)
- .getSlotSharingGroup());
+ assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
+ assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index 1002b10..f6e7e6b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -23,17 +23,18 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFoldFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
+
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
/**
* Tests for {@link StreamGroupedFold}. These test that:
*
@@ -48,18 +49,12 @@ public class StreamGroupedFoldTest {
private static class MyFolder implements FoldFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
@Override
public String fold(String accumulator, Integer value) throws Exception {
return accumulator + value.toString();
}
-
}
- private TypeInformation<Integer> inType = TypeExtractor.getForClass(Integer.class);
- private TypeInformation<String> outType = TypeExtractor.getForClass(String.class);
-
@Test
public void testGroupedFold() throws Exception {
@@ -72,9 +67,10 @@ public class StreamGroupedFoldTest {
};
StreamGroupedFold<Integer, String, String> operator = new StreamGroupedFold<>(new MyFolder(), "100");
- operator.setOutputType(outType, new ExecutionConfig());
+ operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+ testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.STRING_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -112,7 +108,9 @@ public class StreamGroupedFoldTest {
operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
-
+ testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+
long initialTime = 0L;
testHarness.open();
@@ -122,8 +120,8 @@ public class StreamGroupedFoldTest {
testHarness.close();
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
- Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+ assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
+ assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
}
// This must only be used in one test, otherwise the static fields will be changed
@@ -138,7 +136,7 @@ public class StreamGroupedFoldTest {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (closeCalled) {
- Assert.fail("Close called before open.");
+ fail("Close called before open.");
}
openCalled = true;
}
@@ -147,7 +145,7 @@ public class StreamGroupedFoldTest {
public void close() throws Exception {
super.close();
if (!openCalled) {
- Assert.fail("Open was not called before close.");
+ fail("Open was not called before close.");
}
closeCalled = true;
}
@@ -155,7 +153,7 @@ public class StreamGroupedFoldTest {
@Override
public String fold(String acc, Integer in) throws Exception {
if (!openCalled) {
- Assert.fail("Open was not called before run.");
+ fail("Open was not called before run.");
}
return acc + in;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c24dca50/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index b5d2bd6..6cb46c9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -53,6 +53,7 @@ public class StreamGroupedReduceTest {
StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+ testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -84,6 +85,7 @@ public class StreamGroupedReduceTest {
StreamGroupedReduce<Integer> operator =
new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
+ testHarness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
long initialTime = 0L;