You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/23 15:41:19 UTC

[1/5] flink git commit: [hotfix] [runtime] Eagerly create AccumulatorMap in StreamTask

Repository: flink
Updated Branches:
  refs/heads/master b485f8cc6 -> 5b8396719


[hotfix] [runtime] Eagerly create AccumulatorMap in StreamTask

This follows the RAII paradigm, simplifying testing setups and reasoning about initialization.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b839671
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b839671
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b839671

Branch: refs/heads/master
Commit: 5b8396719e9f7d738cc44b53182556ae44f8ec58
Parents: 8258029
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 18 19:19:56 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java    | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b839671/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6ea442d..d6e088d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -154,7 +154,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	protected ProcessingTimeService timerService;
 
 	/** The map of user-defined accumulators of this task. */
-	private Map<String, Accumulator<?, ?>> accumulatorMap;
+	private final Map<String, Accumulator<?, ?>> accumulatorMap;
 
 	/** The currently active background materialization threads. */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
@@ -208,6 +208,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		this.timerService = timeProvider;
 		this.configuration = new StreamConfig(getTaskConfiguration());
+		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 		this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
 	}
 
@@ -255,8 +256,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			stateBackend = createStateBackend();
 			checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
 
-			accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
-
 			// if the clock is not already set, then assign a default TimeServiceProvider
 			if (timerService == null) {
 				ThreadFactory timerThreadFactory =


[3/5] flink git commit: [hotfix] [runtime] Remove unused and unnecessary method from StreamTask

Posted by se...@apache.org.
[hotfix] [runtime] Remove unused and unnecessary method from StreamTask


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c58ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c58ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c58ea

Branch: refs/heads/master
Commit: ee3c58ea65d770a7abc4ab0418351ca5e53bed29
Parents: b485f8c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 16 21:08:29 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 20 +-------------------
 1 file changed, 1 insertion(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c58ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2cc8886..6ea442d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
@@ -211,9 +208,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		this.timerService = timeProvider;
 		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.streamRecordWriters = createStreamRecordWriters(
-			configuration,
-			environment);
+		this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
 	}
 
 	// ------------------------------------------------------------------------
@@ -541,10 +536,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return operatorChain;
 	}
 
-	Output<StreamRecord<OUT>> getHeadOutput() {
-		return operatorChain.getChainEntryPoint();
-	}
-
 	RecordWriterOutput<?>[] getStreamOutputs() {
 		return operatorChain.getStreamOutputs();
 	}
@@ -750,15 +741,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return new CheckpointExceptionHandlerFactory();
 	}
 
-	private String createOperatorIdentifier(StreamOperator<?> operator) {
-		TaskInfo taskInfo = getEnvironment().getTaskInfo();
-		return new OperatorSubtaskDescriptionText(
-			operator.getOperatorID(),
-			operator.getClass().getSimpleName(),
-			taskInfo.getIndexOfThisSubtask(),
-			taskInfo.getNumberOfParallelSubtasks()).toString();
-	}
-
 	/**
 	 * Returns the {@link ProcessingTimeService} responsible for telling the current
 	 * processing time and registering timers.


[4/5] flink git commit: [hotfix] [tests] Cleanup and lambda-ify StreamOperatorChainingTest

Posted by se...@apache.org.
[hotfix] [tests] Cleanup and lambda-ify StreamOperatorChainingTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8258029a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8258029a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8258029a

Branch: refs/heads/master
Commit: 8258029ababaeeead58e3510a6c9bd53f6829901
Parents: dd236f5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 17:17:27 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200

----------------------------------------------------------------------
 .../operators/StreamOperatorChainingTest.java   | 80 ++++----------------
 1 file changed, 13 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8258029a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index fd6a953..9c3b08f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -54,6 +53,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for stream operator chaining behaviour.
  */
+@SuppressWarnings("serial")
 public class StreamOperatorChainingTest {
 
 	// We have to use static fields because the sink functions will go through serialization
@@ -89,47 +89,24 @@ public class StreamOperatorChainingTest {
 		sink2Results = new ArrayList<>();
 
 		input = input
-				.map(new MapFunction<Integer, Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer map(Integer value) throws Exception {
-						return value;
-					}
-				});
+				.map(value -> value);
 
 		input
-				.map(new MapFunction<Integer, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String map(Integer value) throws Exception {
-						return "First: " + value;
-					}
-				})
+				.map(value -> "First: " + value)
 				.addSink(new SinkFunction<String>() {
-					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void invoke(String value) throws Exception {
+					public void invoke(String value, Context ctx) throws Exception {
 						sink1Results.add(value);
 					}
 				});
 
 		input
-				.map(new MapFunction<Integer, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String map(Integer value) throws Exception {
-						return "Second: " + value;
-					}
-				})
+				.map(value -> "Second: " + value)
 				.addSink(new SinkFunction<String>() {
-					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void invoke(String value) throws Exception {
+					public void invoke(String value, Context ctx) throws Exception {
 						sink2Results.add(value);
 					}
 				});
@@ -207,14 +184,7 @@ public class StreamOperatorChainingTest {
 		sink3Results = new ArrayList<>();
 
 		input = input
-				.map(new MapFunction<Integer, Integer>(){
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer map(Integer value) throws Exception {
-						return value;
-					}
-				});
+				.map(value -> value);
 
 		SplitStream<Integer> split = input.split(new OutputSelector<Integer>() {
 			private static final long serialVersionUID = 1L;
@@ -230,55 +200,31 @@ public class StreamOperatorChainingTest {
 		});
 
 		split.select("one")
-				.map(new MapFunction<Integer, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String map(Integer value) throws Exception {
-						return "First 1: " + value;
-					}
-				})
+				.map(value -> "First 1: " + value)
 				.addSink(new SinkFunction<String>() {
-					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void invoke(String value) throws Exception {
+					public void invoke(String value, Context ctx) throws Exception {
 						sink1Results.add(value);
 					}
 				});
 
 		split.select("one")
-				.map(new MapFunction<Integer, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String map(Integer value) throws Exception {
-						return "First 2: " + value;
-					}
-				})
+				.map(value -> "First 2: " + value)
 				.addSink(new SinkFunction<String>() {
-					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void invoke(String value) throws Exception {
+					public void invoke(String value, Context ctx) throws Exception {
 						sink2Results.add(value);
 					}
 				});
 
 		split.select("other")
-				.map(new MapFunction<Integer, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public String map(Integer value) throws Exception {
-						return "Second: " + value;
-					}
-				})
+				.map(value -> "Second: " + value)
 				.addSink(new SinkFunction<String>() {
-					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void invoke(String value) throws Exception {
+					public void invoke(String value, Context ctx) throws Exception {
 						sink3Results.add(value);
 					}
 				});


[5/5] flink git commit: [hotfix] [runtime] Restore interruption flag in StreamRecordWriter.close()

Posted by se...@apache.org.
[hotfix] [runtime] Restore interruption flag in StreamRecordWriter.close()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e754d3d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e754d3d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e754d3d2

Branch: refs/heads/master
Commit: e754d3d28a711f772f2fb0ca1098b29a1c73662d
Parents: ee3c58e
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 16:06:10 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/runtime/io/StreamRecordWriter.java  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e754d3d2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 9fedf70..3f174b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -106,6 +106,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 			}
 			catch (InterruptedException e) {
 				// ignore on close
+				// restore interrupt flag to fast exit further blocking calls
+				Thread.currentThread().interrupt();
 			}
 		}
 	}


[2/5] flink git commit: [hotfix] [runtime] Remove dead code for handling no longer thrown InterruptedException

Posted by se...@apache.org.
[hotfix] [runtime] Remove dead code for handling no longer thrown InterruptedException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd236f51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd236f51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd236f51

Branch: refs/heads/master
Commit: dd236f51dc51f499de6beba09a2de4c8cd37cb27
Parents: e754d3d
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 16:09:07 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/RecordWriterOutput.java          |  2 +-
 .../streaming/runtime/tasks/OperatorChain.java  | 22 ++++++--------------
 2 files changed, 7 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd236f51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index d62d80e..d2b4d1f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -148,7 +148,7 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo
 		}
 	}
 
-	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+	public void broadcastEvent(AbstractEvent event) throws IOException {
 		recordWriter.broadcastEvent(event);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dd236f51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b99cf65..1db4346 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -179,26 +179,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	}
 
 	public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
-		try {
-			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
-			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-				streamOutput.broadcastEvent(barrier);
-			}
-		}
-		catch (InterruptedException e) {
-			throw new IOException("Interrupted while broadcasting checkpoint barrier");
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
+		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+			streamOutput.broadcastEvent(barrier);
 		}
 	}
 
 	public void broadcastCheckpointCancelMarker(long id) throws IOException {
-		try {
-			CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
-			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-				streamOutput.broadcastEvent(barrier);
-			}
-		}
-		catch (InterruptedException e) {
-			throw new IOException("Interrupted while broadcasting checkpoint cancellation");
+		CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
+		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+			streamOutput.broadcastEvent(barrier);
 		}
 	}