You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2019/09/09 14:03:31 UTC

[flink] branch release-1.9 updated (fcdad72 -> b5b705f)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fcdad72  [FLINK-13942][docs] Add "Getting Started" overview page.
     new 1638fb3  [hotfix][runtime] SourceStreamTask: set legacy source thread name to improve debugging
     new b5b705f  [hotfix][runtime] Rename StreamTask's performDefaultAction method to processInput

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/state/api/output/BoundedStreamTask.java   |  2 +-
 .../flink/streaming/runtime/tasks/SourceStreamTask.java    |  7 ++++++-
 .../flink/streaming/runtime/tasks/StreamIterationHead.java |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  4 ++--
 .../runtime/tasks/StreamTaskSelectiveReadingTest.java      |  4 ++--
 .../streaming/runtime/tasks/StreamTaskTerminationTest.java |  2 +-
 .../flink/streaming/runtime/tasks/StreamTaskTest.java      | 14 +++++++-------
 .../runtime/tasks/SynchronousCheckpointITCase.java         |  2 +-
 .../streaming/runtime/tasks/SynchronousCheckpointTest.java |  4 ++--
 .../runtime/tasks/TaskCheckpointingBehaviourTest.java      |  2 +-
 .../org/apache/flink/streaming/util/MockStreamTask.java    |  2 +-
 .../runtime/jobmaster/JobMasterStopWithSavepointIT.java    |  4 ++--
 12 files changed, 27 insertions(+), 22 deletions(-)


[flink] 02/02: [hotfix][runtime] Rename StreamTask's performDefaultAction method to processInput

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5b705f3b8dcafb915a3d59190a03867a27c9113
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Fri Sep 6 10:03:26 2019 +0200

    [hotfix][runtime] Rename StreamTask's performDefaultAction method to processInput
---
 .../apache/flink/state/api/output/BoundedStreamTask.java   |  2 +-
 .../flink/streaming/runtime/tasks/SourceStreamTask.java    |  2 +-
 .../flink/streaming/runtime/tasks/StreamIterationHead.java |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  4 ++--
 .../runtime/tasks/StreamTaskSelectiveReadingTest.java      |  4 ++--
 .../streaming/runtime/tasks/StreamTaskTerminationTest.java |  2 +-
 .../flink/streaming/runtime/tasks/StreamTaskTest.java      | 14 +++++++-------
 .../runtime/tasks/SynchronousCheckpointITCase.java         |  2 +-
 .../streaming/runtime/tasks/SynchronousCheckpointTest.java |  4 ++--
 .../runtime/tasks/TaskCheckpointingBehaviourTest.java      |  2 +-
 .../org/apache/flink/streaming/util/MockStreamTask.java    |  2 +-
 .../runtime/jobmaster/JobMasterStopWithSavepointIT.java    |  4 ++--
 12 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index db663da..814f445 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -76,7 +76,7 @@ class BoundedStreamTask<IN, OUT, OP extends OneInputStreamOperator<IN, OUT> & Bo
 	}
 
 	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
+	protected void processInput(ActionContext context) throws Exception {
 		if (input.hasNext()) {
 			reuse.replace(input.next());
 			headOperator.setKeyContextElement1(reuse);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 5caddef..e1f7990 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -111,7 +111,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	}
 
 	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
+	protected void processInput(ActionContext context) throws Exception {
 		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
 		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
 		sourceThread.setTaskDescription(getName());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index d25bd23..5d71adb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -66,7 +66,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
+	protected void processInput(ActionContext context) throws Exception {
 		StreamRecord<OUT> nextRecord = shouldWait ?
 			dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
 			dataChannel.take();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b1cfc4..aa84ae9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -275,7 +275,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	 * @param context context object for collaborative interaction between the action and the stream task.
 	 * @throws Exception on any problems in the action.
 	 */
-	protected void performDefaultAction(ActionContext context) throws Exception {
+	protected void processInput(ActionContext context) throws Exception {
 		if (!inputProcessor.processInput()) {
 			context.allActionsCompleted();
 		}
@@ -298,7 +298,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				}
 			}
 
-			performDefaultAction(actionContext);
+			processInput(actionContext);
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
index 1308796..2867f0d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
@@ -191,14 +191,14 @@ public class StreamTaskSelectiveReadingTest {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			if (!started) {
 				synchronized (this) {
 					this.wait();
 				}
 			}
 
-			super.performDefaultAction(context);
+			super.processInput(context);
 		}
 
 		public void startProcessing() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 72e8a19..ec2f26d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -225,7 +225,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			RUN_LATCH.trigger();
 			// wait until we have started an asynchronous checkpoint
 			CHECKPOINTING_LATCH.await();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d0295f1..706fb75 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -867,7 +867,7 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() throws Exception {}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			context.allActionsCompleted();
 		}
 
@@ -1074,7 +1074,7 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) {
+		protected void processInput(ActionContext context) {
 			if (isCanceled() || inputFinished) {
 				context.allActionsCompleted();
 			}
@@ -1111,7 +1111,7 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			if (fail) {
 				throw new RuntimeException();
 			}
@@ -1199,7 +1199,7 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() {}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			holder = new LockHolder(getCheckpointLock(), latch);
 			holder.start();
 			latch.await();
@@ -1244,7 +1244,7 @@ public class StreamTaskTest extends TestLogger {
 		protected void init() {}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			final OneShotLatch latch = new OneShotLatch();
 			final Object lock = new Object();
 
@@ -1310,9 +1310,9 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			syncLatch.await();
-			super.performDefaultAction(context);
+			super.processInput(context);
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index 3b4aee3..24320b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -137,7 +137,7 @@ public class SynchronousCheckpointITCase {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			if (!isRunning) {
 				isRunning = true;
 				eventQueue.put(Event.TASK_IS_RUNNING);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 8b71423..0af985f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -171,10 +171,10 @@ public class SynchronousCheckpointTest {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			runningLatch.trigger();
 			execLatch.await();
-			super.performDefaultAction(context);
+			super.processInput(context);
 		}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index e40e23d..b6a5f61 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -474,7 +474,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 		public void init() {}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			triggerCheckpointOnBarrier(
 				new CheckpointMetaData(
 					11L,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index 37e7328..a6d3093 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -81,7 +81,7 @@ public class MockStreamTask extends StreamTask {
 	public void init() { }
 
 	@Override
-	protected void performDefaultAction(ActionContext context) throws Exception {
+	protected void processInput(ActionContext context) throws Exception {
 		context.allActionsCompleted();
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index d54ec1f..6bdb651 100644
--- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -286,7 +286,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			final long taskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
 			if (taskIndex == 0) {
 				numberOfRestarts.countDown();
@@ -343,7 +343,7 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase {
 		}
 
 		@Override
-		protected void performDefaultAction(ActionContext context) throws Exception {
+		protected void processInput(ActionContext context) throws Exception {
 			invokeLatch.countDown();
 			finishLatch.await();
 			context.allActionsCompleted();


[flink] 01/02: [hotfix][runtime] SourceStreamTask: set legacy source thread name to improve debugging

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1638fb3812047736524cd8e25bc4d5a6fa2596a7
Author: Aleksey Pak <al...@ververica.com>
AuthorDate: Fri Sep 6 10:00:31 2019 +0200

    [hotfix][runtime] SourceStreamTask: set legacy source thread name to improve debugging
---
 .../org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java   | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e06e2b4..5caddef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -114,6 +114,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	protected void performDefaultAction(ActionContext context) throws Exception {
 		// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
 		// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
+		sourceThread.setTaskDescription(getName());
 		sourceThread.start();
 
 		// We run an alternative mailbox loop that does not involve default actions and synchronizes around actions.
@@ -207,6 +208,10 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			}
 		}
 
+		public void setTaskDescription(final String taskDescription) {
+			setName("Legacy Source Thread - " + taskDescription);
+		}
+
 		void checkThrowSourceExecutionException() throws Exception {
 			if (sourceExecutionThrowable != null) {
 				throw new Exception(sourceExecutionThrowable);