You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/24 07:39:07 UTC

[flink] branch master updated: [FLINK-12570][network] Work against ResultPartitionWriter interface

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f4f4510  [FLINK-12570][network] Work against ResultPartitionWriter interface
f4f4510 is described below

commit f4f451037c9bab96e241a2a4e13119b1c3bbf21c
Author: azagrebin <az...@users.noreply.github.com>
AuthorDate: Fri May 24 09:38:51 2019 +0200

    [FLINK-12570][network] Work against ResultPartitionWriter interface
    
    This part of Shuffle API refactoring: make task not depend on the
    concrete implementation of ResultPartitionWriter (ResultPartition).
---
 .../network/api/writer/ResultPartitionWriter.java  | 19 +++++++++++++++++++
 .../io/network/partition/ResultPartition.java      |  2 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 11 +++++------
 .../AbstractCollectingResultPartitionWriter.java   | 11 +++++++++++
 .../io/network/api/writer/RecordWriterTest.java    | 22 ++++++++++++++++++++++
 5 files changed, 59 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 49f74af..153b880 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
@@ -65,4 +67,21 @@ public interface ResultPartitionWriter extends AutoCloseable {
 	 * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition.
 	 */
 	void flush(int subpartitionIndex);
+
+	/**
+	 * Fail the production of the partition.
+	 *
+	 * <p>This method propagates non-{@code null} failure causes to consumers on a best-effort basis.
+	 * Closing of partition is still needed.
+	 *
+	 * @param throwable failure cause
+	 */
+	void fail(@Nullable Throwable throwable);
+
+	/**
+	 * Successfully finish the production of the partition.
+	 *
+	 * <p>Closing of partition is still needed.
+	 */
+	void finish() throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ca3855d..15f15e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -256,6 +256,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	 *
 	 * <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
 	 */
+	@Override
 	public void finish() throws IOException {
 		boolean success = false;
 
@@ -313,6 +314,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		}
 	}
 
+	@Override
 	public void fail(@Nullable Throwable throwable) {
 		partitionManager.releasePartition(partitionId, throwable);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1d95231..6742346b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -190,7 +189,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	/** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */
 	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
-	private final ResultPartition[] producedPartitions;
+	private final ResultPartitionWriter[] producedPartitions;
 
 	private final SingleInputGate[] inputGates;
 
@@ -598,7 +597,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 
 			setupPartionsAndGates(producedPartitions, inputGates);
 
-			for (ResultPartition partition : producedPartitions) {
+			for (ResultPartitionWriter partition : producedPartitions) {
 				taskEventDispatcher.registerPartition(partition.getPartitionId());
 			}
 
@@ -688,7 +687,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 			// ----------------------------------------------------------------
 
 			// finish the produced partitions. if this fails, we consider the execution failed.
-			for (ResultPartition partition : producedPartitions) {
+			for (ResultPartitionWriter partition : producedPartitions) {
 				if (partition != null) {
 					partition.finish();
 				}
@@ -845,7 +844,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	private void releaseNetworkResources() {
 		LOG.debug("Release task {} network resources (state: {}).", taskNameWithSubtask, getExecutionState());
 
-		for (ResultPartition partition : producedPartitions) {
+		for (ResultPartitionWriter partition : producedPartitions) {
 			taskEventDispatcher.unregisterPartition(partition.getPartitionId());
 			if (isCanceledOrFailed()) {
 				partition.fail(getFailureCause());
@@ -860,7 +859,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
 	 * release partitions and gates. Another is from task thread during task exiting.
 	 */
 	private void closeNetworkResources() {
-		for (ResultPartition partition : producedPartitions) {
+		for (ResultPartitionWriter partition : producedPartitions) {
 			try {
 				partition.close();
 			} catch (Throwable t) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index fd38ee8..8ae8f5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
@@ -109,5 +110,15 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
 	public void close() {
 	}
 
+	@Override
+	public void fail(@Nullable Throwable throwable) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void finish() {
+		throw new UnsupportedOperationException();
+	}
+
 	protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 956c4e2..35487b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -55,6 +55,8 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -511,6 +513,16 @@ public class RecordWriterTest {
 		}
 
 		@Override
+		public void fail(@Nullable Throwable throwable) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void finish() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public void close() {
 		}
 	}
@@ -576,6 +588,16 @@ public class RecordWriterTest {
 		}
 
 		@Override
+		public void fail(@Nullable Throwable throwable) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void finish() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public void close() {
 		}
 	}