You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/07 08:30:30 UTC

[flink] 01/04: [FLINK-26126][metrics] Introduce new counter metrics for sending records by SinkWriter.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91c2b8d56bae03c2ce4a50b5c014cea842df9f74
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Thu Feb 17 23:33:35 2022 +0100

    [FLINK-26126][metrics] Introduce new counter metrics for sending records by SinkWriter.
    
    We found that the new sink v2 interface will have a wrong numRecordsOut metric for the sink writers. We send a fixed number of records to the source, but the numRecordsOut of the sink continues to increase by the time.
    
    The problem is that both the SinkWriterOperator and the KafkaWriter are using the same counter metric for counting the outgoing records. Same records sent by the SinkWriterOperator to the post topology and written by the KafkaWriter to the downstream system will be count twice in the same counter metric.
---
 .../metrics/groups/SinkWriterMetricGroup.java      | 14 ++++++++++
 .../apache/flink/runtime/metrics/MetricNames.java  |  2 ++
 .../groups/InternalSinkWriterMetricGroup.java      | 14 ++++++++++
 .../testframe/testsuites/SinkTestSuiteBase.java    | 14 +++++++---
 .../test/streaming/runtime/SinkMetricsITCase.java  | 31 +++++++++++++---------
 5 files changed, 60 insertions(+), 15 deletions(-)

diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
index 6a2e576..22e7c77 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
@@ -32,6 +32,20 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup {
     Counter getNumRecordsOutErrorsCounter();
 
     /**
+     * The total number of records have been sent to the downstream system.
+     *
+     * <p>Note: this counter will count all records the SinkWriter sent. From SinkWirter's
+     * perspective, these records have been sent to the downstream system, but the downstream system
+     * may have issue to perform the persistence action within its scope. Therefore, this count may
+     * include the number of records that are failed to write by the downstream system, which should
+     * be counted by {@link #getNumRecordsOutErrorsCounter()}.
+     */
+    Counter getNumRecordsSendCounter();
+
+    /** The total number of output send bytes since the task started. */
+    Counter getNumBytesSendCounter();
+
+    /**
      * Sets an optional gauge for the time it takes to send the last record.
      *
      * <p>This metric is an instantaneous value recorded for the last processed record.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 81228b2..800cf2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -86,6 +86,8 @@ public class MetricNames {
     // FLIP-33 sink
     public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
     public static final String CURRENT_SEND_TIME = "currentSendTime";
+    public static final String NUM_RECORDS_SEND = "numRecordsSend";
+    public static final String NUM_BYTES_SEND = "numBytesSend";
 
     // FLIP-33 source
     public static final String NUM_RECORDS_IN_ERRORS = "numRecordsInErrors";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 1ef995f..8bc99f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -35,12 +35,16 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
         implements SinkWriterMetricGroup {
 
     private final Counter numRecordsOutErrors;
+    private final Counter numRecordsWritten;
+    private final Counter numBytesWritten;
     private final OperatorIOMetricGroup operatorIOMetricGroup;
 
     private InternalSinkWriterMetricGroup(
             MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
         super(parentMetricGroup);
         numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+        numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
+        numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
         this.operatorIOMetricGroup = operatorIOMetricGroup;
     }
 
@@ -72,6 +76,16 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
     }
 
     @Override
+    public Counter getNumRecordsSendCounter() {
+        return numRecordsWritten;
+    }
+
+    @Override
+    public Counter getNumBytesSendCounter() {
+        return numBytesWritten;
+    }
+
+    @Override
     public void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge) {
         parentMetricGroup.gauge(MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge);
     }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index 2c13e83..e958e70 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -405,6 +405,7 @@ public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
                                     externalContext,
                                     jobClient.getJobID(),
                                     sinkName,
+                                    MetricNames.NUM_RECORDS_SEND,
                                     testRecords.size());
                         } catch (Exception e) {
                             // skip failed assert try
@@ -531,16 +532,23 @@ public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
             DataStreamSinkExternalContext<T> context,
             JobID jobId,
             String sinkName,
-            long allRecordSize)
+            String metricsName,
+            long expectedSize)
             throws Exception {
         double sumNumRecordsOut =
                 metricQuerier.getAggregatedMetricsByRestAPI(
                         testEnv.getRestEndpoint(),
                         jobId,
                         sinkName,
-                        MetricNames.IO_NUM_RECORDS_OUT,
+                        metricsName,
                         getSinkMetricFilter(context));
-        return Precision.equals(allRecordSize, sumNumRecordsOut);
+
+        if (Precision.equals(expectedSize, sumNumRecordsOut)) {
+            return true;
+        } else {
+            LOG.info("expected:<{}> but was <{}>({})", expectedSize, sumNumRecordsOut, metricsName);
+            return false;
+        }
     }
 
     /** Sort the list. */
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index e2e4203..c39abe0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -53,7 +52,12 @@ import static org.hamcrest.Matchers.hasSize;
 
 /** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */
 public class SinkMetricsITCase extends TestLogger {
+
+    private static final String TEST_SINK_NAME = "MetricTestSink";
+    // please refer to SinkTransformationTranslator#WRITER_NAME
+    private static final String DEFAULT_WRITER_NAME = "Writer";
     private static final int DEFAULT_PARALLELISM = 4;
+
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
     private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
 
@@ -96,8 +100,12 @@ public class SinkMetricsITCase extends TestLogger {
                             }
                             return i;
                         })
-                .sinkTo(TestSink.newBuilder().setWriter(new MetricWriter()).build())
-                .name("MetricTestSink");
+                .sinkTo(
+                        TestSink.newBuilder()
+                                .setDefaultCommitter()
+                                .setWriter(new MetricWriter())
+                                .build())
+                .name(TEST_SINK_NAME);
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
 
@@ -115,23 +123,24 @@ public class SinkMetricsITCase extends TestLogger {
     private void assertSinkMetrics(
             JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
         List<OperatorMetricGroup> groups =
-                reporter.findOperatorMetricGroups(jobId, "MetricTestSink");
+                reporter.findOperatorMetricGroups(
+                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;
         for (OperatorMetricGroup group : groups) {
             Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
             // there are only 2 splits assigned; so two groups will not update metrics
-            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() != 0) {
                 continue;
             }
             subtaskWithMetrics++;
-            // I/O metrics
+            // SinkWriterMetricGroup metrics
             assertThat(
-                    group.getIOMetricGroup().getNumRecordsOutCounter(),
+                    metrics.get(MetricNames.NUM_RECORDS_SEND),
                     isCounter(equalTo(processedRecordsPerSubtask)));
             assertThat(
-                    group.getIOMetricGroup().getNumBytesOutCounter(),
+                    metrics.get(MetricNames.NUM_BYTES_SEND),
                     isCounter(
                             equalTo(
                                     processedRecordsPerSubtask
@@ -156,12 +165,10 @@ public class SinkMetricsITCase extends TestLogger {
         static final long RECORD_SIZE_IN_BYTES = 10;
         private SinkWriterMetricGroup metricGroup;
         private long sendTime;
-        private Counter recordsOutCounter;
 
         @Override
         public void init(Sink.InitContext context) {
             this.metricGroup = context.metricGroup();
-            this.recordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
             metricGroup.setCurrentSendTimeGauge(() -> sendTime);
         }
 
@@ -169,11 +176,11 @@ public class SinkMetricsITCase extends TestLogger {
         public void write(Long element, Context context) {
             super.write(element, context);
             sendTime = element * BASE_SEND_TIME;
-            recordsOutCounter.inc();
+            metricGroup.getNumRecordsSendCounter().inc();
             if (element % 2 == 0) {
                 metricGroup.getNumRecordsOutErrorsCounter().inc();
             }
-            metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+            metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES);
         }
     }
 }