You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:18 UTC
[flink] 14/14: [FLINK-17699][DataStream API] Initalize
SourceOperator more eagerly and reduce scope or collaborators.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 939625f2c84bdce6872548d3df672f492e33a704
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 23:07:16 2020 +0200
[FLINK-17699][DataStream API] Initalize SourceOperator more eagerly and reduce scope or collaborators.
This reduces the scope of necessary mocking in the tests and of special-casing in the setup logic.
- This removes the dependency on Source and replaces it with a reader factory
- This let's the SourceOperator register itself at the OperatorEventDispatcher
---
.../streaming/api/operators/SourceOperator.java | 45 ++++++++++------
.../api/operators/SourceOperatorFactory.java | 62 ++++++++++++++++------
.../api/operators/SourceOperatorTest.java | 39 +++++++-------
3 files changed, 94 insertions(+), 52 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 3af714d..e4971d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
@@ -45,6 +44,9 @@ import org.apache.flink.util.CollectionUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base source operator only used for integrating the source reader which is proposed by FLIP-27. It implements
@@ -67,24 +69,38 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
- private final Source<OUT, SplitT, ?> source;
+ /** The factory for the source reader. This is a workaround, because currently the SourceReader
+ * must be lazily initialized, which is mainly because the metrics groups that the reader relies on is
+ * lazily initialized. */
+ private final Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory;
+ /** The serializer for the splits, applied to the split types before storing them in the reader state. */
private final SimpleVersionedSerializer<SplitT> splitSerializer;
- // Fields that will be setup at runtime.
- private transient SourceReader<OUT, SplitT> sourceReader;
- private transient ListState<SplitT> readerState;
- private transient OperatorEventGateway operatorEventGateway;
+ /** The event gateway through which this operator talks to its coordinator. */
+ private final OperatorEventGateway operatorEventGateway;
+
+ // ---- lazily initialized fields ----
+
+ /** The source reader that does most of the work. */
+ private SourceReader<OUT, SplitT> sourceReader;
+
+ /** The state that holds the currently assigned splits. */
+ private ListState<SplitT> readerState;
- public SourceOperator(Source<OUT, SplitT, ?> source) {
- this.source = source;
- this.splitSerializer = source.getSplitSerializer();
+ public SourceOperator(
+ Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
+ OperatorEventGateway operatorEventGateway,
+ SimpleVersionedSerializer<SplitT> splitSerializer) {
+
+ this.readerFactory = checkNotNull(readerFactory);
+ this.operatorEventGateway = checkNotNull(operatorEventGateway);
+ this.splitSerializer = checkNotNull(splitSerializer);
}
@Override
public void open() throws Exception {
- // Create the source reader.
- SourceReaderContext context = new SourceReaderContext() {
+ final SourceReaderContext context = new SourceReaderContext() {
@Override
public MetricGroup metricGroup() {
return getRuntimeContext().getMetricGroup();
@@ -95,7 +111,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
}
};
- sourceReader = source.createReader(context);
+
+ sourceReader = readerFactory.apply(context);
// restore the state if necessary.
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
@@ -142,10 +159,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
}
- public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
- this.operatorEventGateway = operatorEventGateway;
- }
-
@SuppressWarnings("unchecked")
public void handleOperatorEvent(OperatorEvent event) {
if (event instanceof AddSplitEvent) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 79f6453..c30a0a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -19,12 +19,17 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
+import java.util.function.Function;
+
/**
* The Factory class for {@link SourceOperator}.
*/
@@ -39,9 +44,6 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
/** The number of worker thread for the source coordinator. */
private final int numCoordinatorWorkerThread;
- /** The {@link OperatorEventDispatcher} to register the SourceOperator. */
- private OperatorEventDispatcher operatorEventDispatcher;
-
public SourceOperatorFactory(Source<OUT, ?, ?> source) {
this(source, 1);
}
@@ -52,15 +54,23 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
}
@Override
- @SuppressWarnings("unchecked")
public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
- SourceOperator<OUT, ?> sourceOperator = new SourceOperator<>(source);
- OperatorEventGateway operatorEventGateway = operatorEventDispatcher.registerEventHandler(
- parameters.getStreamConfig().getOperatorID(),
- sourceOperator);
- sourceOperator.setOperatorEventGateway(operatorEventGateway);
+ final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+ final OperatorEventGateway gateway = parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+ final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
+ source::createReader,
+ gateway,
+ source.getSplitSerializer());
+
sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
- return (T) sourceOperator;
+ parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
+
+ // today's lunch is generics spaghetti
+ @SuppressWarnings("unchecked")
+ final T castedOperator = (T) sourceOperator;
+
+ return castedOperator;
}
@Override
@@ -68,11 +78,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
return new SourceCoordinatorProvider<>(operatorName, operatorID, source, numCoordinatorWorkerThread);
}
- @Override
- public void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher) {
- this.operatorEventDispatcher = operatorEventDispatcher;
- }
-
+ @SuppressWarnings("rawtypes")
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return SourceOperator.class;
@@ -82,4 +88,28 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
public boolean isStreamSource() {
return true;
}
+
+ /**
+ * This is a utility method to conjure up a "SplitT" generics variable binding so that we can
+ * construct the SourceOperator without resorting to "all raw types".
+ * That way, this methods puts all "type non-safety" in one place and allows to maintain as much
+ * generics safety in the main code as possible.
+ */
+ @SuppressWarnings("unchecked")
+ private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
+ Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
+ OperatorEventGateway eventGateway,
+ SimpleVersionedSerializer<?> splitSerializer) {
+
+ // jumping through generics hoops: cast the generics away to then cast them back more strictly typed
+ final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
+ (Function<SourceReaderContext, SourceReader<T, SplitT>>) (Function<?, ?>) readerFactory;
+
+ final SimpleVersionedSerializer<SplitT> typedSplitSerializer = (SimpleVersionedSerializer<SplitT>) splitSerializer;
+
+ return new SourceOperator<>(
+ typedReaderFactory,
+ eventGateway,
+ typedSplitSerializer);
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 653af8b..34b7962 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -19,11 +19,8 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
@@ -31,6 +28,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -61,20 +59,18 @@ import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class SourceOperatorTest {
- private static final int NUM_SPLITS = 5;
private static final int SUBTASK_INDEX = 1;
private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
- private MockSource source;
+ private MockSourceReader mockSourceReader;
private MockOperatorEventGateway mockGateway;
private SourceOperator<Integer, MockSourceSplit> operator;
@Before
public void setup() {
- this.source = new MockSource(Boundedness.BOUNDED, NUM_SPLITS);
- this.operator = new TestingSourceOperator<>(source, SUBTASK_INDEX);
+ this.mockSourceReader = new MockSourceReader();
this.mockGateway = new MockOperatorEventGateway();
- this.operator.setOperatorEventGateway(mockGateway);
+ this.operator = new TestingSourceOperator<>(mockSourceReader, mockGateway, SUBTASK_INDEX);
}
@Test
@@ -91,9 +87,7 @@ public class SourceOperatorTest {
operator.initializeState(getStateContext());
// Open the operator.
operator.open();
- // A source reader should have been created.
- assertEquals(1, source.getCreatedReaders().size());
- MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+
// The source reader should have been assigned a split.
assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits());
// The source reader should have started.
@@ -112,8 +106,7 @@ public class SourceOperatorTest {
operator.open();
MockSourceSplit newSplit = new MockSourceSplit((2));
operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
- // The source reader should bave been assigned two splits.
- MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+ // The source reader should have been assigned two splits.
assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
}
@@ -123,8 +116,7 @@ public class SourceOperatorTest {
operator.open();
SourceEvent event = new SourceEvent() {};
operator.handleOperatorEvent(new SourceEventWrapper(event));
- // The source reader should bave been assigned two splits.
- MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+ // The source reader should have been assigned two splits.
assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents());
}
@@ -179,13 +171,20 @@ public class SourceOperatorTest {
/**
* A testing class that overrides the getRuntimeContext() Method.
*/
- private static class TestingSourceOperator<OUT, SplitT extends SourceSplit>
- extends SourceOperator<OUT, SplitT> {
+ private static class TestingSourceOperator<OUT> extends SourceOperator<OUT, MockSourceSplit> {
private final int subtaskIndex;
- TestingSourceOperator(Source<OUT, SplitT, ?> source, int subtaskIndex) {
- super(source);
+ TestingSourceOperator(
+ SourceReader<OUT, MockSourceSplit> reader,
+ OperatorEventGateway eventGateway,
+ int subtaskIndex) {
+
+ super(
+ (context) -> reader,
+ eventGateway,
+ new MockSourceSplitSerializer());
+
this.subtaskIndex = subtaskIndex;
}