You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/23 15:41:19 UTC
[1/5] flink git commit: [hotfix] [runtime] Eagerly create
AccumulatorMap in StreamTask
Repository: flink
Updated Branches:
refs/heads/master b485f8cc6 -> 5b8396719
[hotfix] [runtime] Eagerly create AccumulatorMap in StreamTask
This follows the RAII paradigm, simplifying testing setups and reasoning about initialization.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b839671
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b839671
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b839671
Branch: refs/heads/master
Commit: 5b8396719e9f7d738cc44b53182556ae44f8ec58
Parents: 8258029
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 18 19:19:56 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5b839671/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6ea442d..d6e088d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -154,7 +154,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
protected ProcessingTimeService timerService;
/** The map of user-defined accumulators of this task. */
- private Map<String, Accumulator<?, ?>> accumulatorMap;
+ private final Map<String, Accumulator<?, ?>> accumulatorMap;
/** The currently active background materialization threads. */
private final CloseableRegistry cancelables = new CloseableRegistry();
@@ -208,6 +208,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.timerService = timeProvider;
this.configuration = new StreamConfig(getTaskConfiguration());
+ this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
}
@@ -255,8 +256,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
- accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
-
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
ThreadFactory timerThreadFactory =
[3/5] flink git commit: [hotfix] [runtime] Remove unused and
unnecessary method from StreamTask
Posted by se...@apache.org.
[hotfix] [runtime] Remove unused and unnecessary method from StreamTask
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c58ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c58ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c58ea
Branch: refs/heads/master
Commit: ee3c58ea65d770a7abc4ab0418351ca5e53bed29
Parents: b485f8c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 16 21:08:29 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 20 +-------------------
1 file changed, 1 insertion(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c58ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2cc8886..6ea442d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
@@ -211,9 +208,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.timerService = timeProvider;
this.configuration = new StreamConfig(getTaskConfiguration());
- this.streamRecordWriters = createStreamRecordWriters(
- configuration,
- environment);
+ this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
}
// ------------------------------------------------------------------------
@@ -541,10 +536,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return operatorChain;
}
- Output<StreamRecord<OUT>> getHeadOutput() {
- return operatorChain.getChainEntryPoint();
- }
-
RecordWriterOutput<?>[] getStreamOutputs() {
return operatorChain.getStreamOutputs();
}
@@ -750,15 +741,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return new CheckpointExceptionHandlerFactory();
}
- private String createOperatorIdentifier(StreamOperator<?> operator) {
- TaskInfo taskInfo = getEnvironment().getTaskInfo();
- return new OperatorSubtaskDescriptionText(
- operator.getOperatorID(),
- operator.getClass().getSimpleName(),
- taskInfo.getIndexOfThisSubtask(),
- taskInfo.getNumberOfParallelSubtasks()).toString();
- }
-
/**
* Returns the {@link ProcessingTimeService} responsible for telling the current
* processing time and registering timers.
[4/5] flink git commit: [hotfix] [tests] Cleanup and lambda-ify
StreamOperatorChainingTest
Posted by se...@apache.org.
[hotfix] [tests] Cleanup and lambda-ify StreamOperatorChainingTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8258029a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8258029a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8258029a
Branch: refs/heads/master
Commit: 8258029ababaeeead58e3510a6c9bd53f6829901
Parents: dd236f5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 17:17:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200
----------------------------------------------------------------------
.../operators/StreamOperatorChainingTest.java | 80 ++++----------------
1 file changed, 13 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8258029a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index fd6a953..9c3b08f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -54,6 +53,7 @@ import static org.mockito.Mockito.when;
/**
* Tests for stream operator chaining behaviour.
*/
+@SuppressWarnings("serial")
public class StreamOperatorChainingTest {
// We have to use static fields because the sink functions will go through serialization
@@ -89,47 +89,24 @@ public class StreamOperatorChainingTest {
sink2Results = new ArrayList<>();
input = input
- .map(new MapFunction<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- });
+ .map(value -> value);
input
- .map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "First: " + value;
- }
- })
+ .map(value -> "First: " + value)
.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
@Override
- public void invoke(String value) throws Exception {
+ public void invoke(String value, Context ctx) throws Exception {
sink1Results.add(value);
}
});
input
- .map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "Second: " + value;
- }
- })
+ .map(value -> "Second: " + value)
.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
@Override
- public void invoke(String value) throws Exception {
+ public void invoke(String value, Context ctx) throws Exception {
sink2Results.add(value);
}
});
@@ -207,14 +184,7 @@ public class StreamOperatorChainingTest {
sink3Results = new ArrayList<>();
input = input
- .map(new MapFunction<Integer, Integer>(){
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- });
+ .map(value -> value);
SplitStream<Integer> split = input.split(new OutputSelector<Integer>() {
private static final long serialVersionUID = 1L;
@@ -230,55 +200,31 @@ public class StreamOperatorChainingTest {
});
split.select("one")
- .map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "First 1: " + value;
- }
- })
+ .map(value -> "First 1: " + value)
.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
@Override
- public void invoke(String value) throws Exception {
+ public void invoke(String value, Context ctx) throws Exception {
sink1Results.add(value);
}
});
split.select("one")
- .map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "First 2: " + value;
- }
- })
+ .map(value -> "First 2: " + value)
.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
@Override
- public void invoke(String value) throws Exception {
+ public void invoke(String value, Context ctx) throws Exception {
sink2Results.add(value);
}
});
split.select("other")
- .map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "Second: " + value;
- }
- })
+ .map(value -> "Second: " + value)
.addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
@Override
- public void invoke(String value) throws Exception {
+ public void invoke(String value, Context ctx) throws Exception {
sink3Results.add(value);
}
});
[5/5] flink git commit: [hotfix] [runtime] Restore interruption flag
in StreamRecordWriter.close()
Posted by se...@apache.org.
[hotfix] [runtime] Restore interruption flag in StreamRecordWriter.close()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e754d3d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e754d3d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e754d3d2
Branch: refs/heads/master
Commit: e754d3d28a711f772f2fb0ca1098b29a1c73662d
Parents: ee3c58e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 16:06:10 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/streaming/runtime/io/StreamRecordWriter.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e754d3d2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 9fedf70..3f174b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -106,6 +106,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
}
catch (InterruptedException e) {
// ignore on close
+ // restore interrupt flag to fast exit further blocking calls
+ Thread.currentThread().interrupt();
}
}
}
[2/5] flink git commit: [hotfix] [runtime] Remove dead code for
handling no longer thrown InterruptedException
Posted by se...@apache.org.
[hotfix] [runtime] Remove dead code for handling no longer thrown InterruptedException
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd236f51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd236f51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd236f51
Branch: refs/heads/master
Commit: dd236f51dc51f499de6beba09a2de4c8cd37cb27
Parents: e754d3d
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 16:09:07 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200
----------------------------------------------------------------------
.../runtime/io/RecordWriterOutput.java | 2 +-
.../streaming/runtime/tasks/OperatorChain.java | 22 ++++++--------------
2 files changed, 7 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dd236f51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index d62d80e..d2b4d1f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -148,7 +148,7 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo
}
}
- public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+ public void broadcastEvent(AbstractEvent event) throws IOException {
recordWriter.broadcastEvent(event);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd236f51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b99cf65..1db4346 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -179,26 +179,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
- try {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- streamOutput.broadcastEvent(barrier);
- }
- }
- catch (InterruptedException e) {
- throw new IOException("Interrupted while broadcasting checkpoint barrier");
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
}
}
public void broadcastCheckpointCancelMarker(long id) throws IOException {
- try {
- CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- streamOutput.broadcastEvent(barrier);
- }
- }
- catch (InterruptedException e) {
- throw new IOException("Interrupted while broadcasting checkpoint cancellation");
+ CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
}
}