You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/07 08:30:29 UTC

[flink] branch master updated (fb05fab -> 9725933)

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fb05fab  [FLINK-25969][table-runtime] Clean up caches in CompileUtils more aggressively
     new 91c2b8d  [FLINK-26126][metrics] Introduce new counter metrics for sending records by SinkWriter.
     new 339cd3c  [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.
     new d07df57  [FLINK-26126][kafka] develop record out error counter metric
     new 9725933  [FLINK-26126][test] migrate KafkaWriterITCase to AssertJ

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/connector/kafka/sink/KafkaWriter.java    |  22 ++--
 .../connector/kafka/sink/KafkaSinkITCase.java      |  13 ---
 .../connector/kafka/sink/KafkaWriterITCase.java    | 120 +++++++++++++--------
 .../flink/tests/util/kafka/KafkaSinkE2ECase.java   |  13 ---
 .../metrics/groups/SinkWriterMetricGroup.java      |  14 +++
 .../apache/flink/runtime/metrics/MetricNames.java  |   2 +
 .../groups/InternalSinkWriterMetricGroup.java      |  14 +++
 .../testframe/testsuites/SinkTestSuiteBase.java    |  14 ++-
 .../test/streaming/runtime/SinkMetricsITCase.java  |  31 +++---
 9 files changed, 148 insertions(+), 95 deletions(-)

[flink] 03/04: [FLINK-26126][kafka] develop record out error counter metric

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d07df5791f42b64891c5396f912841c9c7133d7a
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Wed Mar 2 21:06:30 2022 +0100

    [FLINK-26126][kafka] develop record out error counter metric
---
 .../flink/connector/kafka/sink/KafkaWriter.java    |  7 ++++-
 .../connector/kafka/sink/KafkaWriterITCase.java    | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 8102ee8..ecd62a0 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -95,6 +95,7 @@ class KafkaWriter<IN>
     private final boolean disabledMetrics;
     private final Counter numRecordsSendCounter;
     private final Counter numBytesSendCounter;
+    private final Counter numRecordsOutErrorsCounter;
     private final ProcessingTimeService timeService;
 
     // Number of outgoing bytes at the latest metric sync
@@ -154,6 +155,7 @@ class KafkaWriter<IN>
         this.metricGroup = sinkInitContext.metricGroup();
         this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
         this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
+        this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -410,7 +412,10 @@ class KafkaWriter<IN>
                 FlinkKafkaInternalProducer<byte[], byte[]> producer =
                         KafkaWriter.this.currentProducer;
                 mailboxExecutor.execute(
-                        () -> throwException(metadata, exception, producer),
+                        () -> {
+                            numRecordsOutErrorsCounter.inc();
+                            throwException(metadata, exception, producer);
+                        },
                         "Failed to send data to Kafka");
             }
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index b8447a1..ee21d04 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -193,6 +193,42 @@ public class KafkaWriterITCase {
     }
 
     @Test
+    void testNumRecordsOutErrorsCounterMetric() throws Exception {
+        Properties properties = getKafkaClientConfiguration();
+        final InternalSinkWriterMetricGroup metricGroup =
+                InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
+
+        try (final KafkaWriter<Integer> writer =
+                createWriterWithConfiguration(
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
+            final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
+            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(0L);
+
+            writer.write(1, SINK_WRITER_CONTEXT);
+            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(0L);
+
+            final String transactionalId = writer.getCurrentProducer().getTransactionalId();
+
+            try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                    new FlinkKafkaInternalProducer<>(properties, transactionalId)) {
+
+                producer.initTransactions();
+                producer.beginTransaction();
+                producer.send(new ProducerRecord<byte[], byte[]>(topic, "2".getBytes()));
+                producer.commitTransaction();
+            }
+
+            writer.write(3, SINK_WRITER_CONTEXT);
+            writer.flush(false);
+            writer.prepareCommit();
+            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
+                    .isEqualTo(1L);
+        }
+    }
+
+    @Test
     public void testMetadataPublisher() throws Exception {
         List<String> metadataList = new ArrayList<>();
         try (final KafkaWriter<Integer> writer =

[flink] 01/04: [FLINK-26126][metrics] Introduce new counter metrics for sending records by SinkWriter.

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91c2b8d56bae03c2ce4a50b5c014cea842df9f74
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Thu Feb 17 23:33:35 2022 +0100

    [FLINK-26126][metrics] Introduce new counter metrics for sending records by SinkWriter.
    
    We found that the new sink v2 interface will have a wrong numRecordsOut metric for the sink writers. We send a fixed number of records to the source, but the numRecordsOut of the sink continues to increase by the time.
    
    The problem is that both the SinkWriterOperator and the KafkaWriter are using the same counter metric for counting the outgoing records. Same records sent by the SinkWriterOperator to the post topology and written by the KafkaWriter to the downstream system will be count twice in the same counter metric.
---
 .../metrics/groups/SinkWriterMetricGroup.java      | 14 ++++++++++
 .../apache/flink/runtime/metrics/MetricNames.java  |  2 ++
 .../groups/InternalSinkWriterMetricGroup.java      | 14 ++++++++++
 .../testframe/testsuites/SinkTestSuiteBase.java    | 14 +++++++---
 .../test/streaming/runtime/SinkMetricsITCase.java  | 31 +++++++++++++---------
 5 files changed, 60 insertions(+), 15 deletions(-)

diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
index 6a2e576..22e7c77 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java
@@ -32,6 +32,20 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup {
     Counter getNumRecordsOutErrorsCounter();
 
     /**
+     * The total number of records have been sent to the downstream system.
+     *
+     * <p>Note: this counter will count all records the SinkWriter sent. From SinkWirter's
+     * perspective, these records have been sent to the downstream system, but the downstream system
+     * may have issue to perform the persistence action within its scope. Therefore, this count may
+     * include the number of records that are failed to write by the downstream system, which should
+     * be counted by {@link #getNumRecordsOutErrorsCounter()}.
+     */
+    Counter getNumRecordsSendCounter();
+
+    /** The total number of output send bytes since the task started. */
+    Counter getNumBytesSendCounter();
+
+    /**
      * Sets an optional gauge for the time it takes to send the last record.
      *
      * <p>This metric is an instantaneous value recorded for the last processed record.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index 81228b2..800cf2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -86,6 +86,8 @@ public class MetricNames {
     // FLIP-33 sink
     public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
     public static final String CURRENT_SEND_TIME = "currentSendTime";
+    public static final String NUM_RECORDS_SEND = "numRecordsSend";
+    public static final String NUM_BYTES_SEND = "numBytesSend";
 
     // FLIP-33 source
     public static final String NUM_RECORDS_IN_ERRORS = "numRecordsInErrors";
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 1ef995f..8bc99f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -35,12 +35,16 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
         implements SinkWriterMetricGroup {
 
     private final Counter numRecordsOutErrors;
+    private final Counter numRecordsWritten;
+    private final Counter numBytesWritten;
     private final OperatorIOMetricGroup operatorIOMetricGroup;
 
     private InternalSinkWriterMetricGroup(
             MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
         super(parentMetricGroup);
         numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
+        numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
+        numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
         this.operatorIOMetricGroup = operatorIOMetricGroup;
     }
 
@@ -72,6 +76,16 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
     }
 
     @Override
+    public Counter getNumRecordsSendCounter() {
+        return numRecordsWritten;
+    }
+
+    @Override
+    public Counter getNumBytesSendCounter() {
+        return numBytesWritten;
+    }
+
+    @Override
     public void setCurrentSendTimeGauge(Gauge<Long> currentSendTimeGauge) {
         parentMetricGroup.gauge(MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge);
     }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
index 2c13e83..e958e70 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java
@@ -405,6 +405,7 @@ public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
                                     externalContext,
                                     jobClient.getJobID(),
                                     sinkName,
+                                    MetricNames.NUM_RECORDS_SEND,
                                     testRecords.size());
                         } catch (Exception e) {
                             // skip failed assert try
@@ -531,16 +532,23 @@ public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
             DataStreamSinkExternalContext<T> context,
             JobID jobId,
             String sinkName,
-            long allRecordSize)
+            String metricsName,
+            long expectedSize)
             throws Exception {
         double sumNumRecordsOut =
                 metricQuerier.getAggregatedMetricsByRestAPI(
                         testEnv.getRestEndpoint(),
                         jobId,
                         sinkName,
-                        MetricNames.IO_NUM_RECORDS_OUT,
+                        metricsName,
                         getSinkMetricFilter(context));
-        return Precision.equals(allRecordSize, sumNumRecordsOut);
+
+        if (Precision.equals(expectedSize, sumNumRecordsOut)) {
+            return true;
+        } else {
+            LOG.info("expected:<{}> but was <{}>({})", expectedSize, sumNumRecordsOut, metricsName);
+            return false;
+        }
     }
 
     /** Sort the list. */
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
index e2e4203..c39abe0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -53,7 +52,12 @@ import static org.hamcrest.Matchers.hasSize;
 
 /** Tests whether all provided metrics of a {@link Sink} are of the expected values (FLIP-33). */
 public class SinkMetricsITCase extends TestLogger {
+
+    private static final String TEST_SINK_NAME = "MetricTestSink";
+    // please refer to SinkTransformationTranslator#WRITER_NAME
+    private static final String DEFAULT_WRITER_NAME = "Writer";
     private static final int DEFAULT_PARALLELISM = 4;
+
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
     private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
 
@@ -96,8 +100,12 @@ public class SinkMetricsITCase extends TestLogger {
                             }
                             return i;
                         })
-                .sinkTo(TestSink.newBuilder().setWriter(new MetricWriter()).build())
-                .name("MetricTestSink");
+                .sinkTo(
+                        TestSink.newBuilder()
+                                .setDefaultCommitter()
+                                .setWriter(new MetricWriter())
+                                .build())
+                .name(TEST_SINK_NAME);
         JobClient jobClient = env.executeAsync();
         final JobID jobId = jobClient.getJobID();
 
@@ -115,23 +123,24 @@ public class SinkMetricsITCase extends TestLogger {
     private void assertSinkMetrics(
             JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
         List<OperatorMetricGroup> groups =
-                reporter.findOperatorMetricGroups(jobId, "MetricTestSink");
+                reporter.findOperatorMetricGroups(
+                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
         assertThat(groups, hasSize(parallelism));
 
         int subtaskWithMetrics = 0;
         for (OperatorMetricGroup group : groups) {
             Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
             // there are only 2 splits assigned; so two groups will not update metrics
-            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() != 0) {
                 continue;
             }
             subtaskWithMetrics++;
-            // I/O metrics
+            // SinkWriterMetricGroup metrics
             assertThat(
-                    group.getIOMetricGroup().getNumRecordsOutCounter(),
+                    metrics.get(MetricNames.NUM_RECORDS_SEND),
                     isCounter(equalTo(processedRecordsPerSubtask)));
             assertThat(
-                    group.getIOMetricGroup().getNumBytesOutCounter(),
+                    metrics.get(MetricNames.NUM_BYTES_SEND),
                     isCounter(
                             equalTo(
                                     processedRecordsPerSubtask
@@ -156,12 +165,10 @@ public class SinkMetricsITCase extends TestLogger {
         static final long RECORD_SIZE_IN_BYTES = 10;
         private SinkWriterMetricGroup metricGroup;
         private long sendTime;
-        private Counter recordsOutCounter;
 
         @Override
         public void init(Sink.InitContext context) {
             this.metricGroup = context.metricGroup();
-            this.recordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
             metricGroup.setCurrentSendTimeGauge(() -> sendTime);
         }
 
@@ -169,11 +176,11 @@ public class SinkMetricsITCase extends TestLogger {
         public void write(Long element, Context context) {
             super.write(element, context);
             sendTime = element * BASE_SEND_TIME;
-            recordsOutCounter.inc();
+            metricGroup.getNumRecordsSendCounter().inc();
             if (element % 2 == 0) {
                 metricGroup.getNumRecordsOutErrorsCounter().inc();
             }
-            metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+            metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES);
         }
     }
 }

[flink] 02/04: [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 339cd3c98e4783e3c8225c5495be40a0c04ff386
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Thu Feb 17 23:35:47 2022 +0100

    [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.
---
 .../apache/flink/connector/kafka/sink/KafkaWriter.java    | 15 ++++++++-------
 .../flink/connector/kafka/sink/KafkaSinkITCase.java       | 13 -------------
 .../flink/connector/kafka/sink/KafkaWriterITCase.java     | 15 ++++++++++-----
 .../apache/flink/tests/util/kafka/KafkaSinkE2ECase.java   | 13 -------------
 4 files changed, 18 insertions(+), 38 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 0e0a874..8102ee8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -89,12 +89,13 @@ class KafkaWriter<IN>
     private final KafkaRecordSerializationSchema<IN> recordSerializer;
     private final Callback deliveryCallback;
     private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext;
+
     private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
-    private final Counter numBytesOutCounter;
-    private final ProcessingTimeService timeService;
     private final boolean disabledMetrics;
-    private final Counter numRecordsOutCounter;
+    private final Counter numRecordsSendCounter;
+    private final Counter numBytesSendCounter;
+    private final ProcessingTimeService timeService;
 
     // Number of outgoing bytes at the latest metric sync
     private long latestOutgoingByteTotal;
@@ -151,8 +152,8 @@ class KafkaWriter<IN>
         checkNotNull(sinkInitContext, "sinkInitContext");
         this.timeService = sinkInitContext.getProcessingTimeService();
         this.metricGroup = sinkInitContext.metricGroup();
-        this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
-        this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
+        this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
         this.kafkaSinkContext =
                 new DefaultKafkaSinkContext(
                         sinkInitContext.getSubtaskId(),
@@ -192,7 +193,7 @@ class KafkaWriter<IN>
         final ProducerRecord<byte[], byte[]> record =
                 recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
         currentProducer.send(record, deliveryCallback);
-        numRecordsOutCounter.inc();
+        numRecordsSendCounter.inc();
     }
 
     @Override
@@ -385,7 +386,7 @@ class KafkaWriter<IN>
                     long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue();
                     long outgoingBytesSinceLastUpdate =
                             outgoingBytesUntilNow - latestOutgoingByteTotal;
-                    numBytesOutCounter.inc(outgoingBytesSinceLastUpdate);
+                    numBytesSendCounter.inc(outgoingBytesSinceLastUpdate);
                     latestOutgoingByteTotal = outgoingBytesUntilNow;
                     lastSync = time;
                     registerMetricSync();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 32210c6..be7a928 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -34,9 +34,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
-import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -81,9 +79,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.TestTemplate;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -205,15 +201,6 @@ public class KafkaSinkITCase extends TestLogger {
         @TestContext
         KafkaSinkExternalContextFactory sinkContext =
                 new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
-
-        @Disabled("Skip metric test until FLINK-26126 fixed")
-        @TestTemplate
-        @Override
-        public void testMetrics(
-                TestEnvironment testEnv,
-                DataStreamSinkExternalContext<String> externalContext,
-                CheckpointingMode semantic)
-                throws Exception {}
     }
 
     @Test
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index d68a813..b8447a1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -147,13 +147,18 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
-            final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
-            final Counter numRecordsOut = operatorIOMetricGroup.getNumRecordsOutCounter();
-            assertEquals(numBytesOut.getCount(), 0L);
+            final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
+            final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
+            final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
+            assertEquals(numBytesSend.getCount(), 0L);
+            assertEquals(numRecordsSend.getCount(), 0);
+            assertEquals(numRecordsWrittenErrors.getCount(), 0);
+
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
-            assertEquals(numRecordsOut.getCount(), 1);
-            assertThat(numBytesOut.getCount(), greaterThan(0L));
+            assertEquals(numRecordsSend.getCount(), 1);
+            assertEquals(numRecordsWrittenErrors.getCount(), 0);
+            assertThat(numBytesSend.getCount(), greaterThan(0L));
         }
     }
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index ff845f4..0c75a00 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -19,9 +19,7 @@
 package org.apache.flink.tests.util.kafka;
 
 import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
-import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -32,8 +30,6 @@ import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
 import org.apache.flink.util.DockerImageVersions;
 
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -84,13 +80,4 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
                                     .toURL()));
 
     public KafkaSinkE2ECase() throws Exception {}
-
-    @Disabled("Skip metric test until FLINK-26126 fixed")
-    @TestTemplate
-    @Override
-    public void testMetrics(
-            TestEnvironment testEnv,
-            DataStreamSinkExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {}
 }

[flink] 04/04: [FLINK-26126][test] migrate KafkaWriterITCase to AssertJ

Posted by fp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9725933fc0a09274801d2acb52a6a5256afa10f6
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Fri Mar 4 14:17:01 2022 +0100

    [FLINK-26126][test] migrate KafkaWriterITCase to AssertJ
---
 .../connector/kafka/sink/KafkaWriterITCase.java    | 93 ++++++++++------------
 1 file changed, 40 insertions(+), 53 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index ee21d04..f972de4 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -75,14 +75,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.apache.flink.util.DockerImageVersions.KAFKA;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the standalone KafkaWriter. */
 @ExtendWith(TestLoggerExtension.class)
@@ -126,7 +119,7 @@ public class KafkaWriterITCase {
     public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception {
         try (final KafkaWriter<Integer> ignored =
                 createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
-            assertTrue(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent());
+            assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue();
         }
     }
 
@@ -150,15 +143,15 @@ public class KafkaWriterITCase {
             final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
             final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
             final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
-            assertEquals(numBytesSend.getCount(), 0L);
-            assertEquals(numRecordsSend.getCount(), 0);
-            assertEquals(numRecordsWrittenErrors.getCount(), 0);
+            assertThat(numBytesSend.getCount()).isEqualTo(0L);
+            assertThat(numRecordsSend.getCount()).isEqualTo(0);
+            assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
 
             writer.write(1, SINK_WRITER_CONTEXT);
             timeService.trigger();
-            assertEquals(numRecordsSend.getCount(), 1);
-            assertEquals(numRecordsWrittenErrors.getCount(), 0);
-            assertThat(numBytesSend.getCount(), greaterThan(0L));
+            assertThat(numRecordsSend.getCount()).isEqualTo(1);
+            assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
+            assertThat(numBytesSend.getCount()).isGreaterThan(0L);
         }
     }
 
@@ -173,8 +166,8 @@ public class KafkaWriterITCase {
                         metricGroup)) {
             final Optional<Gauge<Long>> currentSendTime =
                     metricListener.getGauge("currentSendTime");
-            assertTrue(currentSendTime.isPresent());
-            assertEquals(currentSendTime.get().getValue(), 0L);
+            assertThat(currentSendTime.isPresent()).isTrue();
+            assertThat(currentSendTime.get().getValue()).isEqualTo(0L);
             IntStream.range(0, 100)
                     .forEach(
                             (run) -> {
@@ -188,7 +181,7 @@ public class KafkaWriterITCase {
                                     throw new RuntimeException("Failed writing Kafka record.");
                                 }
                             });
-            assertThat(currentSendTime.get().getValue(), greaterThan(0L));
+            assertThat(currentSendTime.get().getValue()).isGreaterThan(0L);
         }
     }
 
@@ -202,12 +195,10 @@ public class KafkaWriterITCase {
                 createWriterWithConfiguration(
                         properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
             final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
-            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
-                    .isEqualTo(0L);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
             writer.write(1, SINK_WRITER_CONTEXT);
-            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
-                    .isEqualTo(0L);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
             final String transactionalId = writer.getCurrentProducer().getTransactionalId();
 
@@ -223,8 +214,7 @@ public class KafkaWriterITCase {
             writer.write(3, SINK_WRITER_CONTEXT);
             writer.flush(false);
             writer.prepareCommit();
-            org.assertj.core.api.Assertions.assertThat(numRecordsOutErrors.getCount())
-                    .isEqualTo(1L);
+            assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
         }
     }
 
@@ -243,7 +233,7 @@ public class KafkaWriterITCase {
                 expected.add("testMetadataPublisher-0@" + i);
             }
             writer.prepareCommit();
-            org.assertj.core.api.Assertions.assertThat(metadataList).isEqualTo(expected);
+            assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected);
         }
     }
 
@@ -270,15 +260,15 @@ public class KafkaWriterITCase {
             recoveredWriter.flush(false);
             Collection<KafkaCommittable> committables = recoveredWriter.prepareCommit();
             recoveredWriter.snapshotState(1);
-            assertThat(committables, hasSize(1));
+            assertThat(committables).hasSize(1);
             final KafkaCommittable committable = committables.stream().findFirst().get();
-            assertThat(committable.getProducer().isPresent(), equalTo(true));
+            assertThat(committable.getProducer().isPresent()).isTrue();
 
             committable.getProducer().get().getObject().commitTransaction();
 
             List<ConsumerRecord<byte[], byte[]>> records =
                     drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true);
-            assertThat(records, hasSize(1));
+            assertThat(records).hasSize(1);
         }
 
         failedWriter.close();
@@ -293,19 +283,18 @@ public class KafkaWriterITCase {
     void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exception {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) {
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getProducerPool()).hasSize(0);
 
             FlinkKafkaInternalProducer<byte[], byte[]> firstProducer = writer.getCurrentProducer();
             writer.flush(false);
             Collection<KafkaCommittable> committables = writer.prepareCommit();
             writer.snapshotState(0);
-            assertThat(committables, hasSize(0));
+            assertThat(committables).hasSize(0);
 
-            assertThat(
-                    "Expected same producer",
-                    writer.getCurrentProducer(),
-                    sameInstance(firstProducer));
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getCurrentProducer() == firstProducer)
+                    .as("Expected same producer")
+                    .isTrue();
+            assertThat(writer.getProducerPool()).hasSize(0);
         }
     }
 
@@ -315,39 +304,37 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getProducerPool()).hasSize(0);
 
             writer.flush(false);
             Collection<KafkaCommittable> committables0 = writer.prepareCommit();
             writer.snapshotState(1);
-            assertThat(committables0, hasSize(1));
+            assertThat(committables0).hasSize(1);
             final KafkaCommittable committable = committables0.stream().findFirst().get();
-            assertThat(committable.getProducer().isPresent(), equalTo(true));
+            assertThat(committable.getProducer().isPresent()).isTrue();
 
             FlinkKafkaInternalProducer<?, ?> firstProducer =
                     committable.getProducer().get().getObject();
-            assertThat(
-                    "Expected different producer",
-                    firstProducer,
-                    not(sameInstance(writer.getCurrentProducer())));
+            assertThat(firstProducer != writer.getCurrentProducer())
+                    .as("Expected different producer")
+                    .isTrue();
 
             // recycle first producer, KafkaCommitter would commit it and then return it
-            assertThat(writer.getProducerPool(), hasSize(0));
+            assertThat(writer.getProducerPool()).hasSize(0);
             firstProducer.commitTransaction();
             committable.getProducer().get().close();
-            assertThat(writer.getProducerPool(), hasSize(1));
+            assertThat(writer.getProducerPool()).hasSize(1);
 
             writer.flush(false);
             Collection<KafkaCommittable> committables1 = writer.prepareCommit();
             writer.snapshotState(2);
-            assertThat(committables1, hasSize(1));
+            assertThat(committables1).hasSize(1);
             final KafkaCommittable committable1 = committables1.stream().findFirst().get();
-            assertThat(committable1.getProducer().isPresent(), equalTo(true));
+            assertThat(committable1.getProducer().isPresent()).isTrue();
 
-            assertThat(
-                    "Expected recycled producer",
-                    firstProducer,
-                    sameInstance(writer.getCurrentProducer()));
+            assertThat(firstProducer == writer.getCurrentProducer())
+                    .as("Expected recycled producer")
+                    .isTrue();
         }
     }
 
@@ -361,7 +348,7 @@ public class KafkaWriterITCase {
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) {
             writer.write(1, SINK_WRITER_CONTEXT);
-            assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(0));
+            assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(0);
         }
 
         try (final KafkaWriter<Integer> writer =
@@ -372,7 +359,7 @@ public class KafkaWriterITCase {
             writer.snapshotState(1L);
 
             // manually commit here, which would only succeed if the first transaction was aborted
-            assertThat(committables, hasSize(1));
+            assertThat(committables).hasSize(1);
             final KafkaCommittable committable = committables.stream().findFirst().get();
             String transactionalId = committable.getTransactionalId();
             try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
@@ -381,7 +368,7 @@ public class KafkaWriterITCase {
                 producer.commitTransaction();
             }
 
-            assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(1));
+            assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(1);
         }
     }