You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 17:44:37 UTC
[flink] branch release-1.11 updated:
[FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 6cf60fd [FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent.
6cf60fd is described below
commit 6cf60fda000d25baddfd7a2c3725eeecba77f886
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sat Jun 6 09:52:48 2020 +0800
[FLINK-18162][connector/common] Serialize the splits in the AddSplitsEvent.
Signed-off-by: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
---
.../coordinator/SourceCoordinatorContext.java | 13 ++++++++---
.../coordinator/SourceCoordinatorProvider.java | 4 +++-
.../flink/runtime/source/event/AddSplitEvent.java | 25 ++++++++++++++++------
.../coordinator/SourceCoordinatorContextTest.java | 10 ++++-----
.../source/coordinator/SourceCoordinatorTest.java | 12 +++++++++--
.../coordinator/SourceCoordinatorTestBase.java | 2 ++
.../streaming/api/operators/SourceOperator.java | 8 ++++++-
.../api/operators/SourceOperatorTest.java | 6 ++++--
.../tasks/SourceOperatorStreamTaskTest.java | 4 +++-
9 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index b6ceda9..c3b4666 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Preconditions;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -83,6 +84,7 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
private final ExecutorService coordinatorExecutor;
private final ExecutorNotifier notifier;
private final OperatorCoordinator.Context operatorCoordinatorContext;
+ private final SimpleVersionedSerializer<SplitT> splitSerializer;
private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
private final SplitAssignmentTracker<SplitT> assignmentTracker;
private final SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
@@ -92,9 +94,10 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
ExecutorService coordinatorExecutor,
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
- OperatorCoordinator.Context operatorCoordinatorContext) {
+ OperatorCoordinator.Context operatorCoordinatorContext,
+ SimpleVersionedSerializer<SplitT> splitSerializser) {
this(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads, operatorCoordinatorContext,
- new SplitAssignmentTracker<>());
+ splitSerializser, new SplitAssignmentTracker<>());
}
// Package private method for unit test.
@@ -103,10 +106,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
int numWorkerThreads,
OperatorCoordinator.Context operatorCoordinatorContext,
+ SimpleVersionedSerializer<SplitT> splitSerializer,
SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
this.coordinatorExecutor = coordinatorExecutor;
this.coordinatorThreadFactory = coordinatorThreadFactory;
this.operatorCoordinatorContext = operatorCoordinatorContext;
+ this.splitSerializer = splitSerializer;
this.registeredReaders = new ConcurrentHashMap<>();
this.assignmentTracker = splitAssignmentTracker;
this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName();
@@ -166,10 +171,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
assignment.assignment().forEach(
(id, splits) -> {
try {
- operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+ operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits, splitSerializer), id);
} catch (TaskNotRunningException e) {
throw new FlinkRuntimeException(String.format(
"Failed to assign splits %s to reader %d.", splits, id), e);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Failed to serialize splits.", e);
}
});
return null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 8ef3f48..44d7671 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
@@ -68,9 +69,10 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(coordinatorThreadName);
ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+ SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
new SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, numWorkerThreads,
- context);
+ context, splitSerializer);
return new SourceCoordinator<>(operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
index 066e747..cf51a24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
@@ -18,8 +18,11 @@
package org.apache.flink.runtime.source.event;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -30,15 +33,23 @@ import java.util.List;
public class AddSplitEvent<SplitT> implements OperatorEvent {
private static final long serialVersionUID = 1L;
-
- private final List<SplitT> splits;
-
- public AddSplitEvent(List<SplitT> splits) {
- this.splits = splits;
+ private final int serializerVersion;
+ private final ArrayList<byte[]> splits;
+
+ public AddSplitEvent(List<SplitT> splits, SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
+ this.splits = new ArrayList<>(splits.size());
+ this.serializerVersion = splitSerializer.getVersion();
+ for (SplitT split : splits) {
+ this.splits.add(splitSerializer.serialize(split));
+ }
}
- public List<SplitT> splits() {
- return splits;
+ public List<SplitT> splits(SimpleVersionedSerializer<SplitT> splitSerializer) throws IOException {
+ List<SplitT> result = new ArrayList<>(splits.size());
+ for (byte[] serializedSplit : splits) {
+ result.add(splitSerializer.deserialize(serializerVersion, serializedSplit));
+ }
+ return result;
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index ba73ca0..2441192 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -34,7 +34,6 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
import static org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
@@ -71,16 +70,16 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
}
@Test
- public void testAssignSplitsFromCoordinatorExecutor() throws ExecutionException, InterruptedException {
+ public void testAssignSplitsFromCoordinatorExecutor() throws Exception {
testAssignSplits(true);
}
@Test
- public void testAssignSplitsFromOtherThread() throws ExecutionException, InterruptedException {
+ public void testAssignSplitsFromOtherThread() throws Exception {
testAssignSplits(false);
}
- private void testAssignSplits(boolean fromCoordinatorExecutor) throws ExecutionException, InterruptedException {
+ private void testAssignSplits(boolean fromCoordinatorExecutor) throws Exception {
// Register the readers.
registerReaders();
@@ -104,7 +103,7 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
assertEquals(1, eventsToSubtask0.size());
OperatorEvent event = eventsToSubtask0.get(0);
assertTrue(event instanceof AddSplitEvent);
- verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) event).splits());
+ verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) event).splits(new MockSourceSplitSerializer()));
}
@Test
@@ -153,6 +152,7 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
coordinatorThreadFactory,
1,
operatorCoordinatorContext,
+ new MockSourceSplitSerializer(),
restoredTracker);
restoredContext.restoreState(new MockSourceSplitSerializer(), in);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index be244fa..3e48a93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -165,8 +167,14 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
List<OperatorEvent> eventsToReader0 = operatorCoordinatorContext.getEventsToOperator().get(0);
assertEquals(2, eventsToReader0.size());
- verifyAssignment(Arrays.asList("0", "3"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits());
- verifyAssignment(Arrays.asList("6"), ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits());
+ try {
+ verifyAssignment(Arrays.asList("0", "3"),
+ ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits(new MockSourceSplitSerializer()));
+ verifyAssignment(Arrays.asList("6"),
+ ((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits(new MockSourceSplitSerializer()));
+ } catch (IOException e) {
+ fail("Failed to deserialize splits.");
+ }
});
// Fail reader 0.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 80a73bb..a14c868 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
@@ -63,6 +64,7 @@ public abstract class SourceCoordinatorTestBase {
coordinatorThreadFactory,
1,
operatorCoordinatorContext,
+ new MockSourceSplitSerializer(),
splitSplitAssignmentTracker);
sourceCoordinator = getNewSourceCoordinator();
enumerator = (MockSplitEnumerator) sourceCoordinator.getEnumerator();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 9ac80d3..74f9360 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -44,7 +44,9 @@ import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FlinkRuntimeException;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -202,7 +204,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
@SuppressWarnings("unchecked")
public void handleOperatorEvent(OperatorEvent event) {
if (event instanceof AddSplitEvent) {
- sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits());
+ try {
+ sourceReader.addSplits(((AddSplitEvent<SplitT>) event).splits(splitSerializer));
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
+ }
} else if (event instanceof SourceEventWrapper) {
sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());
} else {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 93ced72..a45e96e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -103,7 +103,8 @@ public class SourceOperatorTest {
operator.initializeState(getStateContext());
operator.open();
MockSourceSplit newSplit = new MockSourceSplit((2));
- operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
+ operator.handleOperatorEvent(new AddSplitEvent<>(
+ Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
// The source reader should have been assigned two splits.
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
}
@@ -124,7 +125,8 @@ public class SourceOperatorTest {
operator.initializeState(stateContext);
operator.open();
MockSourceSplit newSplit = new MockSourceSplit((2));
- operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
+ operator.handleOperatorEvent(new AddSplitEvent<>(
+ Collections.singletonList(newSplit), new MockSourceSplitSerializer()));
operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L));
// Verify the splits in state.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 0f2e99d..df60344 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -152,7 +153,8 @@ public class SourceOperatorStreamTaskTest {
// Prepare the source split and assign it to the source reader.
MockSourceSplit split = new MockSourceSplit(0, 0);
// Assign the split to the source reader.
- AddSplitEvent<MockSourceSplit> addSplitEvent = new AddSplitEvent<>(Collections.singletonList(split));
+ AddSplitEvent<MockSourceSplit> addSplitEvent =
+ new AddSplitEvent<>(Collections.singletonList(split), new MockSourceSplitSerializer());
testHarness
.getStreamTask()
.dispatchOperatorEvent(OPERATOR_ID, new SerializedValue<>(addSplitEvent));