You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/07 12:56:29 UTC

[GitHub] [flink] syhily opened a new pull request, #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

syhily opened a new pull request, #21249:
URL: https://github.com/apache/flink/pull/21249

   ## What is the purpose of the change
   
   Add Pulsar metrics and tests.
   
   ## Brief change log
   
     - Standard sink metrics are supported.
     - `ProducerStats` and `ConsumerStats` are exposed to Flink metrics by default.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as `PulsarSinkITCase`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1311260266

   @tisonkun `PulsarSinkE2ECase` issue should be resolved in [this PR](https://github.com/apache/flink/pull/21252). We can hold this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016435789


##########
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java:
##########
@@ -34,7 +34,7 @@
 /** Pulsar sink E2E test based on connector testing framework. */
 @SuppressWarnings("unused")
 @Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase {
+public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {

Review Comment:
   Out of topic: IDEA hints me that this test class name doesn't match regex `'[A-Z][A-Za-z\d]*Test(s|Case)?|Test[A-Z][A-Za-z\d]*|IT(.*)|(.*)IT(Case)?'` . Do we run this test case properly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016472142


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java:
##########
@@ -204,6 +205,19 @@ private PulsarSourceOptions() {
                                             "In this case, a single consumer will still receive all the keys, but they may be coming in different orders.")
                                     .build());
 
+    public static final ConfigOption<Boolean> PULSAR_ENABLE_SOURCE_METRICS =
+            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableMetrics")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "We won't expose the metrics from Pulsar Consumer if you disable this option.")

Review Comment:
   ```suggestion
                                               "The metrics from Pulsar Consumer are only exposed if you enable this option.")
   ```
   
   Use positive statements instead of double negative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017133489


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+
+/** The metric statistic for Pulsar's {@link Producer}. */
+public class ProducerMetricsInterceptor implements ProducerInterceptor {
+
+    private final Counter numRecordsOutErrors;
+    private final Counter numRecordsOutCounter;
+    private final Counter numBytesOutCounter;
+
+    public ProducerMetricsInterceptor(SinkWriterMetricGroup metricGroup) {
+        this.numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+        this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();

Review Comment:
   @tisonkun FYI, we won't use the `SinkWriterMetricGroup.getNumRecordsSendCounter()` and `SinkWriterMetricGroup.getNumBytesSendCounter()` because Flink has deprecated them.
   
   You can check it in [this commit](https://github.com/apache/flink/commit/d0e855090683920d57922acbddb64c9a99dceccd).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016479052


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java:
##########
@@ -139,6 +140,19 @@ private PulsarSinkOptions() {
                                             "The maximum number of pending messages in one sink parallelism.")
                                     .build());
 
+    public static final ConfigOption<Boolean> PULSAR_ENABLE_SINK_METRICS =
+            ConfigOptions.key(SINK_CONFIG_PREFIX + "enableMetrics")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "We won't expose the metrics from Pulsar Producer if you disable this option.")

Review Comment:
   ```suggestion
                                               "The metrics from Pulsar Producer are only exposed if you enable this option.")
   ```
   
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016479532


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java:
##########
@@ -62,6 +65,8 @@ public SinkConfiguration(Configuration configuration) {
         this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
         this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
         this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
+        this.enableMetrics =
+                !get(PULSAR_ENABLE_SINK_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;

Review Comment:
   ```suggestion
                   get(PULSAR_ENABLE_SINK_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
   ```
   
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017125833


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java:
##########
@@ -79,6 +82,8 @@ public SourceConfiguration(Configuration configuration) {
         this.subscriptionType = get(PULSAR_SUBSCRIPTION_TYPE);
         this.subscriptionMode = get(PULSAR_SUBSCRIPTION_MODE);
         this.allowKeySharedOutOfOrderDelivery = get(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY);
+        this.enableMetrics =
+                !get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;

Review Comment:
   Yep, I rename the enum name from `PULSAR_DISABLE_SOURCE_METRICS` to `PULSAR_ENABLE_SOURCE_METRICS` and forget to change this assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1308933066

   @syhily it seems `PulsarSinkE2ECase` hang on CI.
   
   ```
   2022-11-09T08:31:18.9555850Z Nov 09 08:31:18 [INFO] -------------------------------------------------------
   2022-11-09T08:31:18.9556334Z Nov 09 08:31:18 [INFO]  T E S T S
   2022-11-09T08:31:18.9556925Z Nov 09 08:31:18 [INFO] -------------------------------------------------------
   2022-11-09T08:31:19.9372438Z Nov 09 08:31:19 [INFO] Running org.apache.flink.tests.util.pulsar.PulsarSinkE2ECase
   2022-11-09T08:35:56.6381274Z Nov 09 08:35:56 [INFO] Tests run: 10, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 276.69 s - in org.apache.flink.tests.util.pulsar.PulsarSinkE2ECase
   2022-11-09T08:35:56.6382321Z Nov 09 08:35:56 [INFO] Running org.apache.flink.tests.util.pulsar.PulsarSourceOrderedE2ECase
   2022-11-09T11:06:33.9256404Z ==========================================================================================
   2022-11-09T11:06:33.9263396Z === WARNING: This task took already 95% of the available time budget of 286 minutes ===
   2022-11-09T11:06:33.9264371Z ==========================================================================================
   2022-11-09T11:06:33.9298659Z ==============================================================================
   2022-11-09T11:06:33.9299156Z The following Java processes are running (JPS)
   2022-11-09T11:06:33.9303613Z ==============================================================================
   2022-11-09T11:06:34.1086175Z 299956 Launcher
   2022-11-09T11:06:34.1087186Z 352462 surefirebooter8263206851455958834.jar
   2022-11-09T11:06:34.1088073Z 390794 Jps
   2022-11-09T11:06:34.1185139Z ==============================================================================
   2022-11-09T11:06:34.1186477Z Printing stack trace of Java process 299956
   2022-11-09T11:06:34.1186923Z ==============================================================================
   2022-11-09T11:06:34.4547968Z 2022-11-09 11:06:34
   2022-11-09T11:06:34.4548894Z Full thread dump OpenJDK 64-Bit Server VM (25.345-b01 mixed mode):
   2022-11-09T11:06:34.4549105Z 
   2022-11-09T11:06:34.4549749Z "Attach Listener" #181 daemon prio=9 os_prio=0 tid=0x00007f8a28014000 nid=0x5f6a5 waiting on condition [0x0000000000000000]
   2022-11-09T11:06:34.4550344Z    java.lang.Thread.State: RUNNABLE
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun merged pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun merged PR #21249:
URL: https://github.com/apache/flink/pull/21249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017435136


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java:
##########
@@ -150,7 +181,13 @@ private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
                     createProducerBuilder(pulsarClient, schema, sinkConfiguration);
             // Set the required topic name.
             builder.topic(topic);
+            // Set the sending counter for metrics.
+            builder.intercept(new ProducerMetricsInterceptor(metricGroup));

Review Comment:
   Although I think logically we should skip adding this interceptor if metrics are disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1308220753

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1308930141

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017120952


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -96,12 +98,18 @@ public PulsarSourceEnumerator(
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
         this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, context, enumState);
+        this.metricGroup = context.metricGroup();
     }
 
     @Override
     public void start() {
         rangeGenerator.open(sourceConfiguration);
 
+        // Expose the split assignment metrics if Flink has supported.
+        if (metricGroup != null) {
+            metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);

Review Comment:
   Yep. Flink may add this support one day. I just add it for making sure all the metrics have been exposed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] imaffe commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
imaffe commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1308242332

   LGTM~ Great job~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1311679395

   Merging...
   
   @syhily the next time you can push a separate commit so that it can be reviewed easily. Otherwise, it's hard to find the diff by the reviewer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016435789


##########
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java:
##########
@@ -34,7 +34,7 @@
 /** Pulsar sink E2E test based on connector testing framework. */
 @SuppressWarnings("unused")
 @Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase {
+public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {

Review Comment:
   Out of topic: IDEA hints me that this test class name doesn't match regex '[A-Z][A-Za-z\d]*Test(s|Case)?|Test[A-Z][A-Za-z\d]*|IT(.*)|(.*)IT(Case)?' . Do we run this test case properly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017126559


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -96,12 +98,18 @@ public PulsarSourceEnumerator(
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
         this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, context, enumState);
+        this.metricGroup = context.metricGroup();
     }
 
     @Override
     public void start() {
         rangeGenerator.open(sourceConfiguration);
 
+        // Expose the split assignment metrics if Flink has supported.
+        if (metricGroup != null) {
+            metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);

Review Comment:
   OR should I just remove it now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017429516


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java:
##########
@@ -150,7 +181,13 @@ private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
                     createProducerBuilder(pulsarClient, schema, sinkConfiguration);
             // Set the required topic name.
             builder.topic(topic);
+            // Set the sending counter for metrics.
+            builder.intercept(new ProducerMetricsInterceptor(metricGroup));

Review Comment:
   This interceptor is only used for recording the IO metrics and no need to disable it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1311675493

   @tisonkun The ci passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016470697


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java:
##########
@@ -79,6 +82,8 @@ public SourceConfiguration(Configuration configuration) {
         this.subscriptionType = get(PULSAR_SUBSCRIPTION_TYPE);
         this.subscriptionMode = get(PULSAR_SUBSCRIPTION_MODE);
         this.allowKeySharedOutOfOrderDelivery = get(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY);
+        this.enableMetrics =
+                !get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;

Review Comment:
   ```suggestion
                   get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017434788


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java:
##########
@@ -150,7 +181,13 @@ private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
                     createProducerBuilder(pulsarClient, schema, sinkConfiguration);
             // Set the required topic name.
             builder.topic(topic);
+            // Set the sending counter for metrics.
+            builder.intercept(new ProducerMetricsInterceptor(metricGroup));

Review Comment:
   OK. It's fine if no significant performance regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21249:
URL: https://github.com/apache/flink/pull/21249#issuecomment-1305588209

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "84665087858b4d9a894bac4831ee8fba681b9eef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "84665087858b4d9a894bac4831ee8fba681b9eef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 84665087858b4d9a894bac4831ee8fba681b9eef UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1016479992


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -96,12 +98,18 @@ public PulsarSourceEnumerator(
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
         this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, context, enumState);
+        this.metricGroup = context.metricGroup();
     }
 
     @Override
     public void start() {
         rangeGenerator.open(sourceConfiguration);
 
+        // Expose the split assignment metrics if Flink has supported.
+        if (metricGroup != null) {
+            metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);

Review Comment:
   It seems this function call is always noop from the current codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017119151


##########
flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java:
##########
@@ -34,7 +34,7 @@
 /** Pulsar sink E2E test based on connector testing framework. */
 @SuppressWarnings("unused")
 @Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase {
+public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {

Review Comment:
   All the flink's e2e tests are in this naming suffix. I prefer to keep this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017322603


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java:
##########
@@ -96,12 +98,18 @@ public PulsarSourceEnumerator(
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
         this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, context, enumState);
+        this.metricGroup = context.metricGroup();
     }
 
     @Override
     public void start() {
         rangeGenerator.open(sourceConfiguration);
 
+        // Expose the split assignment metrics if Flink has supported.
+        if (metricGroup != null) {
+            metricGroup.setUnassignedSplitsGauge(splitAssigner::getUnassignedSplitCount);

Review Comment:
   Nope. I think it's OK with this intention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017344322


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+
+/** The metric statistic for Pulsar's {@link Producer}. */
+public class ProducerMetricsInterceptor implements ProducerInterceptor {
+
+    private final Counter numRecordsOutErrors;
+    private final Counter numRecordsOutCounter;
+    private final Counter numBytesOutCounter;
+
+    public ProducerMetricsInterceptor(SinkWriterMetricGroup metricGroup) {
+        this.numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+        this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();

Review Comment:
   Interesting. I think those two methods worth a deprecated doc line :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017342375


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java:
##########
@@ -222,6 +248,7 @@ protected Consumer<byte[]> createPulsarConsumer(TopicPartition partition) {
                 createConsumerBuilder(pulsarClient, Schema.BYTES, sourceConfiguration);
 
         consumerBuilder.topic(partition.getFullTopicName());
+        consumerBuilder.intercept(new ConsumerMetricsInterceptor(metricGroup));

Review Comment:
   ditto



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java:
##########
@@ -150,7 +181,13 @@ private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
                     createProducerBuilder(pulsarClient, schema, sinkConfiguration);
             // Set the required topic name.
             builder.topic(topic);
+            // Set the sending counter for metrics.
+            builder.intercept(new ProducerMetricsInterceptor(metricGroup));

Review Comment:
   Do we also exclude this interceptor if metrics are disabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21249: [FLINK-26027][Connector/Pulsar] Expose Pulsar metrics

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21249:
URL: https://github.com/apache/flink/pull/21249#discussion_r1017429944


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/metrics/ProducerMetricsInterceptor.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+
+/** The metric statistic for Pulsar's {@link Producer}. */
+public class ProducerMetricsInterceptor implements ProducerInterceptor {
+
+    private final Counter numRecordsOutErrors;
+    private final Counter numRecordsOutCounter;
+    private final Counter numBytesOutCounter;
+
+    public ProducerMetricsInterceptor(SinkWriterMetricGroup metricGroup) {
+        this.numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+        this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();

Review Comment:
   I don't know why they didn't add a `Deprecated` annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org