You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:34 UTC

[flink] 04/10: [FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 428fbbad85379f71a9fbf1a1ab3e3dcfa9006436
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Aug 26 15:50:16 2020 +0200

    [FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext
---
 .../api/connector/source/SourceReaderContext.java  | 12 ++++++++++
 .../streaming/api/operators/SourceOperator.java    | 28 ++++++++++++++++++----
 .../api/operators/SourceOperatorFactory.java       | 13 +++++++---
 .../operators/source/TestingSourceOperator.java    |  5 +++-
 4 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 658034b..c8ed36d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 
 /**
@@ -33,6 +34,17 @@ public interface SourceReaderContext {
 	MetricGroup metricGroup();
 
 	/**
+	 * Gets the configuration with which Flink was started.
+	 */
+	Configuration getConfiguration();
+
+	/**
+	 * Gets the hostname of the machine where this reader is executed. This can be used
+	 * to request splits local to the machine, if needed.
+	 */
+	String getLocalHostName();
+
+	/**
 	 * Send a source event to the source coordinator.
 	 *
 	 * @param sourceEvent the source event to coordinator.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 33e95f2..1af572c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
@@ -65,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <OUT> The output type of the operator.
  */
 @Internal
-@SuppressWarnings("serial")
+//@SuppressWarnings("serial")
 public class SourceOperator<OUT, SplitT extends SourceSplit>
 		extends AbstractStreamOperator<OUT>
 		implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
@@ -89,6 +90,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	/** The factory for timestamps and watermark generators. */
 	private final WatermarkStrategy<OUT> watermarkStrategy;
 
+	/** The Flink configuration. */
+	private final Configuration configuration;
+
+	/** Host name of the machine where the operator runs, to support locality aware work assignment. */
+	private final String localHostname;
+
 	// ---- lazily initialized fields (these fields are the "hot" fields) ----
 
 	/** The source reader that does most of the work. */
@@ -110,13 +117,17 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			OperatorEventGateway operatorEventGateway,
 			SimpleVersionedSerializer<SplitT> splitSerializer,
 			WatermarkStrategy<OUT> watermarkStrategy,
-			ProcessingTimeService timeService) {
+			ProcessingTimeService timeService,
+			Configuration configuration,
+			String localHostname) {
 
 		this.readerFactory = checkNotNull(readerFactory);
 		this.operatorEventGateway = checkNotNull(operatorEventGateway);
 		this.splitSerializer = checkNotNull(splitSerializer);
 		this.watermarkStrategy = checkNotNull(watermarkStrategy);
 		this.processingTimeService = timeService;
+		this.configuration = checkNotNull(configuration);
+		this.localHostname = checkNotNull(localHostname);
 	}
 
 	@Override
@@ -130,6 +141,16 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			}
 
 			@Override
+			public Configuration getConfiguration() {
+				return configuration;
+			}
+
+			@Override
+			public String getLocalHostName() {
+				return localHostname;
+			}
+
+			@Override
 			public void sendSourceEventToCoordinator(SourceEvent event) {
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
@@ -219,8 +240,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
 	private void registerReader() {
 		operatorEventGateway.sendEventToCoordinator(new ReaderRegistrationEvent(
-				getRuntimeContext().getIndexOfThisSubtask(),
-				"UNKNOWN_LOCATION"));
+				getRuntimeContext().getIndexOfThisSubtask(), localHostname));
 	}
 
 	// --------------- methods for unit tests ------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index d1636cd..1d7aed7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -75,7 +76,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 				gateway,
 				source.getSplitSerializer(),
 				watermarkStrategy,
-				parameters.getProcessingTimeService());
+				parameters.getProcessingTimeService(),
+				parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+				parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress());
 
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
 		parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -115,7 +118,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 			OperatorEventGateway eventGateway,
 			SimpleVersionedSerializer<?> splitSerializer,
 			WatermarkStrategy<T> watermarkStrategy,
-			ProcessingTimeService timeService) {
+			ProcessingTimeService timeService,
+			Configuration config,
+			String localHostName) {
 
 		// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
 		final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
@@ -128,6 +133,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 				eventGateway,
 				typedSplitSerializer,
 				watermarkStrategy,
-				timeService);
+				timeService,
+				config,
+				localHostName);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 038a211..a133ace 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -71,7 +72,9 @@ public class TestingSourceOperator<T>  extends SourceOperator<T, MockSourceSplit
 			eventGateway,
 			new MockSourceSplitSerializer(),
 			watermarkStrategy,
-			timeService);
+			timeService,
+			new Configuration(),
+			"localhost");
 
 		this.subtaskIndex = subtaskIndex;
 		this.parallelism = parallelism;