You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:34 UTC
[flink] 04/10: [FLINK-19205][core] Add access to configuration and
hostname in the SourceReaderContext
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 428fbbad85379f71a9fbf1a1ab3e3dcfa9006436
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Aug 26 15:50:16 2020 +0200
[FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext
---
.../api/connector/source/SourceReaderContext.java | 12 ++++++++++
.../streaming/api/operators/SourceOperator.java | 28 ++++++++++++++++++----
.../api/operators/SourceOperatorFactory.java | 13 +++++++---
.../operators/source/TestingSourceOperator.java | 5 +++-
4 files changed, 50 insertions(+), 8 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 658034b..c8ed36d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
/**
@@ -33,6 +34,17 @@ public interface SourceReaderContext {
MetricGroup metricGroup();
/**
+ * Gets the configuration with which Flink was started.
+ */
+ Configuration getConfiguration();
+
+ /**
+ * Gets the hostname of the machine where this reader is executed. This can be used
+ * to request splits local to the machine, if needed.
+ */
+ String getLocalHostName();
+
+ /**
* Send a source event to the source coordinator.
*
* @param sourceEvent the source event to coordinator.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 33e95f2..1af572c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
@@ -65,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* @param <OUT> The output type of the operator.
*/
@Internal
-@SuppressWarnings("serial")
+//@SuppressWarnings("serial")
public class SourceOperator<OUT, SplitT extends SourceSplit>
extends AbstractStreamOperator<OUT>
implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
@@ -89,6 +90,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
/** The factory for timestamps and watermark generators. */
private final WatermarkStrategy<OUT> watermarkStrategy;
+ /** The Flink configuration. */
+ private final Configuration configuration;
+
+ /** Host name of the machine where the operator runs, to support locality aware work assignment. */
+ private final String localHostname;
+
// ---- lazily initialized fields (these fields are the "hot" fields) ----
/** The source reader that does most of the work. */
@@ -110,13 +117,17 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
OperatorEventGateway operatorEventGateway,
SimpleVersionedSerializer<SplitT> splitSerializer,
WatermarkStrategy<OUT> watermarkStrategy,
- ProcessingTimeService timeService) {
+ ProcessingTimeService timeService,
+ Configuration configuration,
+ String localHostname) {
this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
this.splitSerializer = checkNotNull(splitSerializer);
this.watermarkStrategy = checkNotNull(watermarkStrategy);
this.processingTimeService = timeService;
+ this.configuration = checkNotNull(configuration);
+ this.localHostname = checkNotNull(localHostname);
}
@Override
@@ -130,6 +141,16 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
}
@Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public String getLocalHostName() {
+ return localHostname;
+ }
+
+ @Override
public void sendSourceEventToCoordinator(SourceEvent event) {
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
}
@@ -219,8 +240,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
private void registerReader() {
operatorEventGateway.sendEventToCoordinator(new ReaderRegistrationEvent(
- getRuntimeContext().getIndexOfThisSubtask(),
- "UNKNOWN_LOCATION"));
+ getRuntimeContext().getIndexOfThisSubtask(), localHostname));
}
// --------------- methods for unit tests ------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index d1636cd..1d7aed7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -75,7 +76,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
gateway,
source.getSplitSerializer(),
watermarkStrategy,
- parameters.getProcessingTimeService());
+ parameters.getProcessingTimeService(),
+ parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+ parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress());
sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -115,7 +118,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
OperatorEventGateway eventGateway,
SimpleVersionedSerializer<?> splitSerializer,
WatermarkStrategy<T> watermarkStrategy,
- ProcessingTimeService timeService) {
+ ProcessingTimeService timeService,
+ Configuration config,
+ String localHostName) {
// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
@@ -128,6 +133,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
eventGateway,
typedSplitSerializer,
watermarkStrategy,
- timeService);
+ timeService,
+ config,
+ localHostName);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 038a211..a133ace 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -71,7 +72,9 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit
eventGateway,
new MockSourceSplitSerializer(),
watermarkStrategy,
- timeService);
+ timeService,
+ new Configuration(),
+ "localhost");
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;