You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 17:44:37 UTC

[flink] branch release-1.11 updated: [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 6cf60fd  [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent.
6cf60fd is described below

commit 6cf60fda000d25baddfd7a2c3725eeecba77f886
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sat Jun 6 09:52:48 2020 +0800

    [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent.
    
    Signed-off-by: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
---
 .../coordinator/SourceCoordinatorContext.java      | 13 ++++++++---
 .../coordinator/SourceCoordinatorProvider.java     |  4 +++-
 .../flink/runtime/source/event/AddSplitEvent.java  | 25 ++++++++++++++++------
 .../coordinator/SourceCoordinatorContextTest.java  | 10 ++++-----
 .../source/coordinator/SourceCoordinatorTest.java  | 12 +++++++++--
 .../coordinator/SourceCoordinatorTestBase.java     |  2 ++
 .../streaming/api/operators/SourceOperator.java    |  8 ++++++-
 .../api/operators/SourceOperatorTest.java          |  6 ++++--
 .../tasks/SourceOperatorStreamTaskTest.java        |  4 +++-
 9 files changed, 62 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index b6ceda9..c3b4666 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -83,6 +84,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 	private final ExecutorService coordinatorExecutor;
 	private final ExecutorNotifier notifier;
 	private final OperatorCoordinator.Context operatorCoordinatorContext;
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
 	private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
 	private final SplitAssignmentTracker<SplitT> assignmentTracker;
 	private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
@@ -92,9 +94,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 			ExecutorService coordinatorExecutor,
 			SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
 			int numWorkerThreads,
-			OperatorCoordinator.Context operatorCoordinatorContext) {
+			OperatorCoordinator.Context operatorCoordinatorContext,
+			SimpleVersionedSerializer<SplitT> splitSerializser) {
 		this(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, operatorCoordinatorContext,
-				new SplitAssignmentTracker<>());
+				splitSerializser, new SplitAssignmentTracker<>());
 	}
 
 	// Package private method for unit test.
@@ -103,10 +106,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 			SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
 			int numWorkerThreads,
 			OperatorCoordinator.Context operatorCoordinatorContext,
+			SimpleVersionedSerializer<SplitT> splitSerializer,
 			SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
 		this.coordinatorExecutor = coordinatorExecutor;
 		this.coordinatorThreadFactory = coordinatorThreadFactory;
 		this.operatorCoordinatorContext = operatorCoordinatorContext;
+		this.splitSerializer = splitSerializer;
 		this.registeredReaders = new ConcurrentHashMap<>();
 		this.assignmentTracker = splitAssignmentTracker;
 		this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
@@ -166,10 +171,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 			assignment.assignment().forEach(
 					(id, splits) -> {
 						try {
-							operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+							operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits, splitSerializer), id);
 						} catch (TaskNotRunningException e) {
 							throw new FlinkRuntimeException(String.format(
 									"Failed to assign splits %s to reader %d.", splits, id), e);
+						} catch (IOException e) {
+							throw new FlinkRuntimeException("Failed to serialize splits.", e);
 						}
 					});
 			return null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 8ef3f48..44d7671 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
@@ -68,9 +69,10 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
 		CoordinatorExecutorThreadFactory coordinatorThreadFactory =
 				new CoordinatorExecutorThreadFactory(coordinatorThreadName);
 		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+		SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
 		SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
 				new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads,
-						context);
+						context, splitSerializer);
 		return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
index 066e747..cf51a24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.source.event;
 
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -30,15 +33,23 @@ import java.util.List;
 public class AddSplitEvent<SplitT> implements OperatorEvent {
 
 	private static final long serialVersionUID = 1L;
-
-	private final List<SplitT> splits;
-
-	public AddSplitEvent(List<SplitT> splits) {
-		this.splits = splits;
+	private final int serializerVersion;
+	private final ArrayList<byte[]> splits;
+
+	public AddSplitEvent(List<SplitT> splits, SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
+		this.splits = new ArrayList<>(splits.size());
+		this.serializerVersion = splitSerializer.getVersion();
+		for (SplitT split : splits) {
+			this.splits.add(splitSerializer.serialize(split));
+		}
 	}
 
-	public List<SplitT> splits() {
-		return splits;
+	public List<SplitT> splits(SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
+		List<SplitT> result = new ArrayList<>(splits.size());
+		for (byte[] serializedSplit : splits) {
+			result.add(splitSerializer.deserialize(serializerVersion, serializedSplit));
+		}
+		return result;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index ba73ca0..2441192 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -34,7 +34,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
 import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
@@ -71,16 +70,16 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
 	}
 
 	@Test
-	public void testAssignSplitsFromCoordinatorExecutor() throws ExecutionException, InterruptedException {
+	public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
 		testAssignSplits(true);
 	}
 
 	@Test
-	public void testAssignSplitsFromOtherThread() throws ExecutionException, InterruptedException {
+	public void testAssignSplitsFromOtherThread() throws Exception {
 		testAssignSplits(false);
 	}
 
-	private void testAssignSplits(boolean fromCoordinatorExecutor) throws ExecutionException, InterruptedException {
+	private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception {
 		// Register the readers.
 		registerReaders();
 
@@ -104,7 +103,7 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
 		assertEquals(1, eventsToSubtask0.size());
 		OperatorEvent event = eventsToSubtask0.get(0);
 		assertTrue(event instanceof AddSplitEvent);
-		verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) event).splits());
+		verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) event).splits(new MockSourceSplitSerializer()));
 	}
 
 	@Test
@@ -153,6 +152,7 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
 					coordinatorThreadFactory,
 					1,
 					operatorCoordinatorContext,
+					new MockSourceSplitSerializer(),
 					restoredTracker);
 			restoredContext.restoreState(new MockSourceSplitSerializer(), in);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index be244fa..3e48a93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.source.event.SourceEventWrapper;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -165,8 +167,14 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 
 			List<OperatorEvent> eventsToReader0 = operatorCoordinatorContext.getEventsToOperator().get(0);
 			assertEquals(2, eventsToReader0.size());
-			verifyAssignment(Arrays.asList("0", "3"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits());
-			verifyAssignment(Arrays.asList("6"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits());
+			try {
+				verifyAssignment(Arrays.asList("0", "3"),
+						((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits(new MockSourceSplitSerializer()));
+				verifyAssignment(Arrays.asList("6"),
+						((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits(new MockSourceSplitSerializer()));
+			} catch (IOException e) {
+				fail("Failed to deserialize splits.");
+			}
 		});
 
 		// Fail reader 0.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 80a73bb..a14c868 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
@@ -63,6 +64,7 @@ public abstract class SourceCoordinatorTestBase {
 				coordinatorThreadFactory,
 				1,
 				operatorCoordinatorContext,
+				new MockSourceSplitSerializer(),
 				splitSplitAssignmentTracker);
 		sourceCoordinator = getNewSourceCoordinator();
 		enumerator = (MockSplitEnumerator) sourceCoordinator.getEnumerator();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 9ac80d3..74f9360 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -44,7 +44,9 @@ import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FlinkRuntimeException;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
@@ -202,7 +204,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	@SuppressWarnings("unchecked")
 	public void handleOperatorEvent(OperatorEvent event) {
 		if (event instanceof AddSplitEvent) {
-			sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits());
+			try {
+				sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits(splitSerializer));
+			} catch (IOException e) {
+				throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
+			}
 		} else if (event instanceof SourceEventWrapper) {
 			sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
 		} else {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 93ced72..a45e96e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -103,7 +103,8 @@ public class SourceOperatorTest {
 		operator.initializeState(getStateContext());
 		operator.open();
 		MockSourceSplit newSplit = new MockSourceSplit((2));
-		operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
+		operator.handleOperatorEvent(new AddSplitEvent<>(
+				Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
 		// The source reader should have been assigned two splits.
 		assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
 	}
@@ -124,7 +125,8 @@ public class SourceOperatorTest {
 		operator.initializeState(stateContext);
 		operator.open();
 		MockSourceSplit newSplit = new MockSourceSplit((2));
-		operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
+		operator.handleOperatorEvent(new AddSplitEvent<>(
+				Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
 		operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
 
 		// Verify the splits in state.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 0f2e99d..df60344 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -152,7 +153,8 @@ public class SourceOperatorStreamTaskTest {
 			// Prepare the source split and assign it to the source reader.
 			MockSourceSplit split = new MockSourceSplit(0, 0);
 			// Assign the split to the source reader.
-			AddSplitEvent<MockSourceSplit> addSplitEvent = new AddSplitEvent<>(Collections.singletonList(split));
+			AddSplitEvent<MockSourceSplit> addSplitEvent =
+					new AddSplitEvent<>(Collections.singletonList(split), new MockSourceSplitSerializer());
 			testHarness
 					.getStreamTask()
 					.dispatchOperatorEvent(OPERATOR_ID, new SerializedValue<>(addSplitEvent));