You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/07 08:30:31 UTC
[flink] 02/04: [FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 339cd3c98e4783e3c8225c5495be40a0c04ff386
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Thu Feb 17 23:35:47 2022 +0100
[FLINK-26126][kafka] use record/byte send counter metrics from SinkWriterMetricGroup directly. Bug fixed. Enable the metric test in KafkaSinkITCase and KafkaSinkE2ECase again.
---
.../apache/flink/connector/kafka/sink/KafkaWriter.java | 15 ++++++++-------
.../flink/connector/kafka/sink/KafkaSinkITCase.java | 13 -------------
.../flink/connector/kafka/sink/KafkaWriterITCase.java | 15 ++++++++++-----
.../apache/flink/tests/util/kafka/KafkaSinkE2ECase.java | 13 -------------
4 files changed, 18 insertions(+), 38 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
index 0e0a874..8102ee8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
@@ -89,12 +89,13 @@ class KafkaWriter<IN>
private final KafkaRecordSerializationSchema<IN> recordSerializer;
private final Callback deliveryCallback;
private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext;
+
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
private final SinkWriterMetricGroup metricGroup;
- private final Counter numBytesOutCounter;
- private final ProcessingTimeService timeService;
private final boolean disabledMetrics;
- private final Counter numRecordsOutCounter;
+ private final Counter numRecordsSendCounter;
+ private final Counter numBytesSendCounter;
+ private final ProcessingTimeService timeService;
// Number of outgoing bytes at the latest metric sync
private long latestOutgoingByteTotal;
@@ -151,8 +152,8 @@ class KafkaWriter<IN>
checkNotNull(sinkInitContext, "sinkInitContext");
this.timeService = sinkInitContext.getProcessingTimeService();
this.metricGroup = sinkInitContext.metricGroup();
- this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
- this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+ this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
+ this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
this.kafkaSinkContext =
new DefaultKafkaSinkContext(
sinkInitContext.getSubtaskId(),
@@ -192,7 +193,7 @@ class KafkaWriter<IN>
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
currentProducer.send(record, deliveryCallback);
- numRecordsOutCounter.inc();
+ numRecordsSendCounter.inc();
}
@Override
@@ -385,7 +386,7 @@ class KafkaWriter<IN>
long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue();
long outgoingBytesSinceLastUpdate =
outgoingBytesUntilNow - latestOutgoingByteTotal;
- numBytesOutCounter.inc(outgoingBytesSinceLastUpdate);
+ numBytesSendCounter.inc(outgoingBytesSinceLastUpdate);
latestOutgoingByteTotal = outgoingBytesUntilNow;
lastSync = time;
registerMetricSync();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index 32210c6..be7a928 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -34,9 +34,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
-import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -81,9 +79,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.TestTemplate;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -205,15 +201,6 @@ public class KafkaSinkITCase extends TestLogger {
@TestContext
KafkaSinkExternalContextFactory sinkContext =
new KafkaSinkExternalContextFactory(kafka.getContainer(), Collections.emptyList());
-
- @Disabled("Skip metric test until FLINK-26126 fixed")
- @TestTemplate
- @Override
- public void testMetrics(
- TestEnvironment testEnv,
- DataStreamSinkExternalContext<String> externalContext,
- CheckpointingMode semantic)
- throws Exception {}
}
@Test
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index d68a813..b8447a1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -147,13 +147,18 @@ public class KafkaWriterITCase {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
- final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
- final Counter numRecordsOut = operatorIOMetricGroup.getNumRecordsOutCounter();
- assertEquals(numBytesOut.getCount(), 0L);
+ final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
+ final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
+ final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
+ assertEquals(numBytesSend.getCount(), 0L);
+ assertEquals(numRecordsSend.getCount(), 0);
+ assertEquals(numRecordsWrittenErrors.getCount(), 0);
+
writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
- assertEquals(numRecordsOut.getCount(), 1);
- assertThat(numBytesOut.getCount(), greaterThan(0L));
+ assertEquals(numRecordsSend.getCount(), 1);
+ assertEquals(numRecordsWrittenErrors.getCount(), 0);
+ assertThat(numBytesSend.getCount(), greaterThan(0L));
}
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index ff845f4..0c75a00 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -19,9 +19,7 @@
package org.apache.flink.tests.util.kafka;
import org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
-import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -32,8 +30,6 @@ import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
import org.apache.flink.util.DockerImageVersions;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@@ -84,13 +80,4 @@ public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
.toURL()));
public KafkaSinkE2ECase() throws Exception {}
-
- @Disabled("Skip metric test until FLINK-26126 fixed")
- @TestTemplate
- @Override
- public void testMetrics(
- TestEnvironment testEnv,
- DataStreamSinkExternalContext<String> externalContext,
- CheckpointingMode semantic)
- throws Exception {}
}