You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2021/11/18 19:10:28 UTC
[flink] 04/04: [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7a109823a21150c7bea9ba41fc22203cbaf7094f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Aug 17 18:31:54 2021 +0200
[FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.
This closes #16867
---
.../source/coordinator/SourceCoordinator.java | 29 ++++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 5ba4160..85a767e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -163,19 +164,31 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
public void handleEventFromOperator(int subtask, OperatorEvent event) {
runInEventLoop(
() -> {
- LOG.debug(
- "Handling event from subtask {} of source {}: {}",
- subtask,
- operatorName,
- event);
if (event instanceof RequestSplitEvent) {
+ LOG.info(
+ "Source {} received split request from parallel task {}",
+ operatorName,
+ subtask);
enumerator.handleSplitRequest(
subtask, ((RequestSplitEvent) event).hostName());
} else if (event instanceof SourceEventWrapper) {
- enumerator.handleSourceEvent(
- subtask, ((SourceEventWrapper) event).getSourceEvent());
+ final SourceEvent sourceEvent =
+ ((SourceEventWrapper) event).getSourceEvent();
+ LOG.debug(
+ "Source {} received custom event from parallel task {}: {}",
+ operatorName,
+ subtask,
+ sourceEvent);
+ enumerator.handleSourceEvent(subtask, sourceEvent);
} else if (event instanceof ReaderRegistrationEvent) {
- handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+ final ReaderRegistrationEvent registrationEvent =
+ (ReaderRegistrationEvent) event;
+ LOG.info(
+ "Source {} registering reader for parallel task {} @ {}",
+ operatorName,
+ subtask,
+ registrationEvent.location());
+ handleReaderRegistrationEvent(registrationEvent);
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}