You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2021/11/18 19:10:28 UTC

[flink] 04/04: [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a109823a21150c7bea9ba41fc22203cbaf7094f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Aug 17 18:31:54 2021 +0200

    [FLINK-23842][coordination] Add logging statements in SourceCoordinators for reader registration and split requests.
    
    This closes #16867
---
 .../source/coordinator/SourceCoordinator.java      | 29 ++++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 5ba4160..85a767e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -163,19 +164,31 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
     public void handleEventFromOperator(int subtask, OperatorEvent event) {
         runInEventLoop(
                 () -> {
-                    LOG.debug(
-                            "Handling event from subtask {} of source {}: {}",
-                            subtask,
-                            operatorName,
-                            event);
                     if (event instanceof RequestSplitEvent) {
+                        LOG.info(
+                                "Source {} received split request from parallel task {}",
+                                operatorName,
+                                subtask);
                         enumerator.handleSplitRequest(
                                 subtask, ((RequestSplitEvent) event).hostName());
                     } else if (event instanceof SourceEventWrapper) {
-                        enumerator.handleSourceEvent(
-                                subtask, ((SourceEventWrapper) event).getSourceEvent());
+                        final SourceEvent sourceEvent =
+                                ((SourceEventWrapper) event).getSourceEvent();
+                        LOG.debug(
+                                "Source {} received custom event from parallel task {}: {}",
+                                operatorName,
+                                subtask,
+                                sourceEvent);
+                        enumerator.handleSourceEvent(subtask, sourceEvent);
                     } else if (event instanceof ReaderRegistrationEvent) {
-                        handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+                        final ReaderRegistrationEvent registrationEvent =
+                                (ReaderRegistrationEvent) event;
+                        LOG.info(
+                                "Source {} registering reader for parallel task {} @ {}",
+                                operatorName,
+                                subtask,
+                                registrationEvent.location());
+                        handleReaderRegistrationEvent(registrationEvent);
                     } else {
                         throw new FlinkException("Unrecognized Operator Event: " + event);
                     }