You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:18 UTC

[flink] 14/14: [FLINK-17699][DataStream API] Initalize SourceOperator more eagerly and reduce scope or collaborators.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 939625f2c84bdce6872548d3df672f492e33a704
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 23:07:16 2020 +0200

    [FLINK-17699][DataStream API] Initalize SourceOperator more eagerly and reduce scope or collaborators.
    
    This reduces the scope of necessary mocking in the tests and of special-casing in the setup logic.
    
      - This removes the dependency on Source and replaces it with a reader factory
      - This let's the SourceOperator register itself at the OperatorEventDispatcher
---
 .../streaming/api/operators/SourceOperator.java    | 45 ++++++++++------
 .../api/operators/SourceOperatorFactory.java       | 62 ++++++++++++++++------
 .../api/operators/SourceOperatorTest.java          | 39 +++++++-------
 3 files changed, 94 insertions(+), 52 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 3af714d..e4971d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -45,6 +44,9 @@ import org.apache.flink.util.CollectionUtil;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base source operator only used for integrating the source reader which is proposed by FLIP-27. It implements
@@ -67,24 +69,38 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
 			new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
 
-	private final Source<OUT, SplitT, ?> source;
+	/** The factory for the source reader. This is a workaround, because currently the SourceReader
+	 * must be lazily initialized, which is mainly because the metrics groups that the reader relies on is
+	 * lazily initialized. */
+	private final Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory;
 
+	/** The serializer for the splits, applied to the split types before storing them in the reader state. */
 	private final SimpleVersionedSerializer<SplitT> splitSerializer;
 
-	// Fields that will be setup at runtime.
-	private transient SourceReader<OUT, SplitT> sourceReader;
-	private transient ListState<SplitT> readerState;
-	private transient OperatorEventGateway operatorEventGateway;
+	/** The event gateway through which this operator talks to its coordinator. */
+	private final OperatorEventGateway operatorEventGateway;
+
+	// ---- lazily initialized fields ----
+
+	/** The source reader that does most of the work. */
+	private SourceReader<OUT, SplitT> sourceReader;
+
+	/** The state that holds the currently assigned splits. */
+	private ListState<SplitT> readerState;
 
-	public SourceOperator(Source<OUT, SplitT, ?> source) {
-		this.source = source;
-		this.splitSerializer = source.getSplitSerializer();
+	public SourceOperator(
+			Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
+			OperatorEventGateway operatorEventGateway,
+			SimpleVersionedSerializer<SplitT> splitSerializer) {
+
+		this.readerFactory = checkNotNull(readerFactory);
+		this.operatorEventGateway = checkNotNull(operatorEventGateway);
+		this.splitSerializer = checkNotNull(splitSerializer);
 	}
 
 	@Override
 	public void open() throws Exception {
-		// Create the source reader.
-		SourceReaderContext context = new SourceReaderContext() {
+		final SourceReaderContext context = new SourceReaderContext() {
 			@Override
 			public MetricGroup metricGroup() {
 				return getRuntimeContext().getMetricGroup();
@@ -95,7 +111,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
 		};
-		sourceReader = source.createReader(context);
+
+		sourceReader = readerFactory.apply(context);
 
 		// restore the state if necessary.
 		final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
@@ -142,10 +159,6 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 		readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
 	}
 
-	public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
-		this.operatorEventGateway = operatorEventGateway;
-	}
-
 	@SuppressWarnings("unchecked")
 	public void handleOperatorEvent(OperatorEvent event) {
 		if (event instanceof AddSplitEvent) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index 79f6453..c30a0a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -19,12 +19,17 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
 
+import java.util.function.Function;
+
 /**
  * The Factory class for {@link SourceOperator}.
  */
@@ -39,9 +44,6 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	/** The number of worker thread for the source coordinator. */
 	private final int numCoordinatorWorkerThread;
 
-	/** The {@link OperatorEventDispatcher} to register the SourceOperator. */
-	private OperatorEventDispatcher operatorEventDispatcher;
-
 	public SourceOperatorFactory(Source<OUT, ?, ?> source) {
 		this(source, 1);
 	}
@@ -52,15 +54,23 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
-		SourceOperator<OUT, ?> sourceOperator = new SourceOperator<>(source);
-		OperatorEventGateway operatorEventGateway = operatorEventDispatcher.registerEventHandler(
-				parameters.getStreamConfig().getOperatorID(),
-				sourceOperator);
-		sourceOperator.setOperatorEventGateway(operatorEventGateway);
+		final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+		final OperatorEventGateway gateway = parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+		final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator(
+				source::createReader,
+				gateway,
+				source.getSplitSerializer());
+
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
-		return (T) sourceOperator;
+		parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
+
+		// today's lunch is generics spaghetti
+		@SuppressWarnings("unchecked")
+		final T castedOperator = (T) sourceOperator;
+
+		return castedOperator;
 	}
 
 	@Override
@@ -68,11 +78,7 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 		return new SourceCoordinatorProvider<>(operatorName, operatorID, source, numCoordinatorWorkerThread);
 	}
 
-	@Override
-	public void setOperatorEventDispatcher(OperatorEventDispatcher operatorEventDispatcher) {
-		this.operatorEventDispatcher = operatorEventDispatcher;
-	}
-
+	@SuppressWarnings("rawtypes")
 	@Override
 	public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
 		return SourceOperator.class;
@@ -82,4 +88,28 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 	public boolean isStreamSource() {
 		return true;
 	}
+
+	/**
+	 * This is a utility method to conjure up a "SplitT" generics variable binding so that we can
+	 * construct the SourceOperator without resorting to "all raw types".
+	 * That way, this methods puts all "type non-safety" in one place and allows to maintain as much
+	 * generics safety in the main code as possible.
+	 */
+	@SuppressWarnings("unchecked")
+	private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
+			Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
+			OperatorEventGateway eventGateway,
+			SimpleVersionedSerializer<?> splitSerializer) {
+
+		// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
+		final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
+				(Function<SourceReaderContext, SourceReader<T, SplitT>>) (Function<?, ?>) readerFactory;
+
+		final SimpleVersionedSerializer<SplitT> typedSplitSerializer = (SimpleVersionedSerializer<SplitT>) splitSerializer;
+
+		return new SourceOperator<>(
+				typedReaderFactory,
+				eventGateway,
+				typedSplitSerializer);
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 653af8b..34b7962 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -19,11 +19,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
@@ -31,6 +28,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
@@ -61,20 +59,18 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class SourceOperatorTest {
 
-	private static final int NUM_SPLITS = 5;
 	private static final int SUBTASK_INDEX = 1;
 	private static final MockSourceSplit MOCK_SPLIT = new MockSourceSplit(1234, 10);
 
-	private MockSource source;
+	private MockSourceReader mockSourceReader;
 	private MockOperatorEventGateway mockGateway;
 	private SourceOperator<Integer, MockSourceSplit> operator;
 
 	@Before
 	public void setup() {
-		this.source = new MockSource(Boundedness.BOUNDED, NUM_SPLITS);
-		this.operator = new TestingSourceOperator<>(source, SUBTASK_INDEX);
+		this.mockSourceReader = new MockSourceReader();
 		this.mockGateway = new MockOperatorEventGateway();
-		this.operator.setOperatorEventGateway(mockGateway);
+		this.operator = new TestingSourceOperator<>(mockSourceReader, mockGateway, SUBTASK_INDEX);
 	}
 
 	@Test
@@ -91,9 +87,7 @@ public class SourceOperatorTest {
 		operator.initializeState(getStateContext());
 		// Open the operator.
 		operator.open();
-		// A source reader should have been created.
-		assertEquals(1, source.getCreatedReaders().size());
-		MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+
 		// The source reader should have been assigned a split.
 		assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits());
 		// The source reader should have started.
@@ -112,8 +106,7 @@ public class SourceOperatorTest {
 		operator.open();
 		MockSourceSplit newSplit = new MockSourceSplit((2));
 		operator.handleOperatorEvent(new AddSplitEvent<>(Collections.singletonList(newSplit)));
-		// The source reader should bave been assigned two splits.
-		MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+		// The source reader should have been assigned two splits.
 		assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits());
 	}
 
@@ -123,8 +116,7 @@ public class SourceOperatorTest {
 		operator.open();
 		SourceEvent event = new SourceEvent() {};
 		operator.handleOperatorEvent(new SourceEventWrapper(event));
-		// The source reader should bave been assigned two splits.
-		MockSourceReader mockSourceReader = source.getCreatedReaders().get(0);
+		// The source reader should have been assigned two splits.
 		assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents());
 	}
 
@@ -179,13 +171,20 @@ public class SourceOperatorTest {
 	/**
 	 * A testing class that overrides the getRuntimeContext() Method.
 	 */
-	private static class TestingSourceOperator<OUT, SplitT extends SourceSplit>
-			extends SourceOperator<OUT, SplitT> {
+	private static class TestingSourceOperator<OUT> extends SourceOperator<OUT, MockSourceSplit> {
 
 		private final int subtaskIndex;
 
-		TestingSourceOperator(Source<OUT, SplitT, ?> source, int subtaskIndex) {
-			super(source);
+		TestingSourceOperator(
+				SourceReader<OUT, MockSourceSplit> reader,
+				OperatorEventGateway eventGateway,
+				int subtaskIndex) {
+
+			super(
+					(context) -> reader,
+					eventGateway,
+					new MockSourceSplitSerializer());
+
 			this.subtaskIndex = subtaskIndex;
 		}