You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/24 07:39:07 UTC
[flink] branch master updated: [FLINK-12570][network] Work against
ResultPartitionWriter interface
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f4f4510 [FLINK-12570][network] Work against ResultPartitionWriter interface
f4f4510 is described below
commit f4f451037c9bab96e241a2a4e13119b1c3bbf21c
Author: azagrebin <az...@users.noreply.github.com>
AuthorDate: Fri May 24 09:38:51 2019 +0200
[FLINK-12570][network] Work against ResultPartitionWriter interface
This part of Shuffle API refactoring: make task not depend on the
concrete implementation of ResultPartitionWriter (ResultPartition).
---
.../network/api/writer/ResultPartitionWriter.java | 19 +++++++++++++++++++
.../io/network/partition/ResultPartition.java | 2 ++
.../org/apache/flink/runtime/taskmanager/Task.java | 11 +++++------
.../AbstractCollectingResultPartitionWriter.java | 11 +++++++++++
.../io/network/api/writer/RecordWriterTest.java | 22 ++++++++++++++++++++++
5 files changed, 59 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 49f74af..153b880 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -22,6 +22,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import javax.annotation.Nullable;
+
import java.io.IOException;
/**
@@ -65,4 +67,21 @@ public interface ResultPartitionWriter extends AutoCloseable {
* Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition.
*/
void flush(int subpartitionIndex);
+
+ /**
+ * Fail the production of the partition.
+ *
+ * <p>This method propagates non-{@code null} failure causes to consumers on a best-effort basis.
+ * Closing of partition is still needed.
+ *
+ * @param throwable failure cause
+ */
+ void fail(@Nullable Throwable throwable);
+
+ /**
+ * Successfully finish the production of the partition.
+ *
+ * <p>Closing of partition is still needed.
+ */
+ void finish() throws IOException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index ca3855d..15f15e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -256,6 +256,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
*
* <p>For BLOCKING results, this will trigger the deployment of consuming tasks.
*/
+ @Override
public void finish() throws IOException {
boolean success = false;
@@ -313,6 +314,7 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
}
}
+ @Override
public void fail(@Nullable Throwable throwable) {
partitionManager.releasePartition(partitionId, throwable);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1d95231..6742346b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -52,7 +52,6 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -190,7 +189,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
/** Serialized version of the job specific execution configuration (see {@link ExecutionConfig}). */
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
- private final ResultPartition[] producedPartitions;
+ private final ResultPartitionWriter[] producedPartitions;
private final SingleInputGate[] inputGates;
@@ -598,7 +597,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
setupPartionsAndGates(producedPartitions, inputGates);
- for (ResultPartition partition : producedPartitions) {
+ for (ResultPartitionWriter partition : producedPartitions) {
taskEventDispatcher.registerPartition(partition.getPartitionId());
}
@@ -688,7 +687,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
// ----------------------------------------------------------------
// finish the produced partitions. if this fails, we consider the execution failed.
- for (ResultPartition partition : producedPartitions) {
+ for (ResultPartitionWriter partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
@@ -845,7 +844,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
private void releaseNetworkResources() {
LOG.debug("Release task {} network resources (state: {}).", taskNameWithSubtask, getExecutionState());
- for (ResultPartition partition : producedPartitions) {
+ for (ResultPartitionWriter partition : producedPartitions) {
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
if (isCanceledOrFailed()) {
partition.fail(getFailureCause());
@@ -860,7 +859,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
* release partitions and gates. Another is from task thread during task exiting.
*/
private void closeNetworkResources() {
- for (ResultPartition partition : producedPartitions) {
+ for (ResultPartitionWriter partition : producedPartitions) {
try {
partition.close();
} catch (Throwable t) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index fd38ee8..8ae8f5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
@@ -109,5 +110,15 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
public void close() {
}
+ @Override
+ public void fail(@Nullable Throwable throwable) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void finish() {
+ throw new UnsupportedOperationException();
+ }
+
protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 956c4e2..35487b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -55,6 +55,8 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -511,6 +513,16 @@ public class RecordWriterTest {
}
@Override
+ public void fail(@Nullable Throwable throwable) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void finish() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
}
}
@@ -576,6 +588,16 @@ public class RecordWriterTest {
}
@Override
+ public void fail(@Nullable Throwable throwable) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void finish() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
}
}