You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 13:53:34 UTC
[4/7] flink git commit: [FLINK-6998] [kafka] Add Kafka offset commit
metrics to Flink Kafka Consumer
[FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8828648b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8828648b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8828648b
Branch: refs/heads/master
Commit: 8828648b4f377a97a9aac42365297479054a5ef0
Parents: c5221c8
Author: Zhenzhong Xu <zx...@netflix.com>
Authored: Mon Jun 26 15:51:09 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800
----------------------------------------------------------------------
docs/monitoring/metrics.md | 24 ++++++++++++
.../kafka/internal/Kafka010FetcherTest.java | 6 +--
.../kafka/internals/Kafka08Fetcher.java | 8 +++-
.../kafka/internal/Kafka09Fetcher.java | 5 ++-
.../kafka/internal/KafkaConsumerThread.java | 27 ++++++++++++-
.../kafka/internal/Kafka09FetcherTest.java | 6 +--
.../kafka/FlinkKafkaConsumerBase.java | 40 +++++++++++++++++++-
.../kafka/internals/AbstractFetcher.java | 3 +-
.../kafka/internals/KafkaCommitCallback.java | 39 +++++++++++++++++++
.../kafka/FlinkKafkaConsumerBaseTest.java | 13 ++++---
.../kafka/internals/AbstractFetcherTest.java | 2 +-
11 files changed, 154 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 06ed9ef..b8f4acc 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -896,6 +896,30 @@ Thus, in order to infer the metric identifier:
</tbody>
</table>
+#### Connectors:
+
+##### Kafka Connectors
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Scope</th>
+ <th class="text-left" style="width: 30%">Metrics</th>
+ <th class="text-left" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <th rowspan="1">Operator</th>
+ <td>commitsSucceeded</td>
+ <td>Kafka offset commit success count if Kafka commit is turned on and checkpointing is enabled.</td>
+ </tr>
+ <tr>
+ <th rowspan="1">Operator</th>
+ <td>commitsFailed</td>
+ <td>Kafka offset commit failure count if Kafka commit is turned on and checkpointing is enabled.</td>
+ </tr>
+ </tbody>
+</table>
### Latency tracking
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index 2ea1622..22e183d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -157,7 +157,7 @@ public class Kafka010FetcherTest {
@Override
public void run() {
try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
+ fetcher.commitInternalOffsetsToKafka(testCommitData, null);
} catch (Throwable t) {
commitError.set(t);
}
@@ -285,7 +285,7 @@ public class Kafka010FetcherTest {
// ----- trigger the first offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -302,7 +302,7 @@ public class Kafka010FetcherTest {
// ----- trigger the second offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index aa7649c..6e2a1d4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -346,15 +346,21 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
// ------------------------------------------------------------------------
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
+ if (commitCallback != null) {
+ commitCallback.onSuccess();
+ }
}
catch (Exception e) {
if (running) {
+ if (commitCallback != null) {
+ commitCallback.onException(e);
+ }
throw e;
} else {
return;
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 68f54ed..db7d63b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -209,7 +210,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
}
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
@SuppressWarnings("unchecked")
List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
@@ -228,6 +229,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
}
// record the work to be committed by the main consumer thread and make sure the consumer notices that
- consumerThread.setOffsetsToCommit(offsetsToCommit);
+ consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index de8fb0b..6ff82d7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
@@ -107,6 +108,8 @@ public class KafkaConsumerThread extends Thread {
/** Flag tracking whether the latest commit request has completed. */
private volatile boolean commitInProgress;
+ /** User callback to be invoked when commits completed. */
+ private volatile KafkaCommitCallback callerCommitCallback;
public KafkaConsumerThread(
Logger log,
Handover handover,
@@ -308,17 +311,33 @@ public class KafkaConsumerThread extends Thread {
*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
- * superseded by newer ones.
+ * superseded by newer ones.
*
* @param offsetsToCommit The offsets to commit
*/
public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+ setOffsetsToCommit(offsetsToCommit, null);
+ }
+
+ /**
+ * Tells this thread to commit a set of offsets. This method does not block, the committing
+ * operation will happen asynchronously.
+ *
+ * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+ * the frequency with which this method is called, then some commits may be skipped due to being
+ * superseded by newer ones.
+ *
+ * @param offsetsToCommit The offsets to commit
+ * @param commitCallback callback when kafka commit completes
+ */
+ public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
// record the work to be committed by the main consumer thread and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
+ this.callerCommitCallback = commitCallback;
// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
handover.wakeupProducer();
@@ -482,6 +501,12 @@ public class KafkaConsumerThread extends Thread {
if (ex != null) {
log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+ if (callerCommitCallback != null) {
+ callerCommitCallback.onException(ex);
+ }
+ }
+ else if (callerCommitCallback != null) {
+ callerCommitCallback.onSuccess();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index 649092b..f9ec204 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -157,7 +157,7 @@ public class Kafka09FetcherTest {
@Override
public void run() {
try {
- fetcher.commitInternalOffsetsToKafka(testCommitData);
+ fetcher.commitInternalOffsetsToKafka(testCommitData, null);
} catch (Throwable t) {
commitError.set(t);
}
@@ -284,7 +284,7 @@ public class Kafka09FetcherTest {
// ----- trigger the first offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData1);
+ fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -301,7 +301,7 @@ public class Kafka09FetcherTest {
// ----- trigger the second offset commit -----
- fetcher.commitInternalOffsetsToKafka(testCommitData2);
+ fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index c331bf0..f3c9e5e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -42,6 +43,7 @@ import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -186,6 +188,24 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
private volatile boolean running = true;
// ------------------------------------------------------------------------
+ // internal metrics
+ // ------------------------------------------------------------------------
+
+ /** Counter for successful Kafka offset commits. */
+ private transient Counter successfulCommits;
+
+ /** Counter for failed Kafka offset commits. */
+ private transient Counter failedCommits;
+
+ /** Callback interface that will be invoked upon async Kafka commit completion.
+ * Please be aware that default callback implementation in base class does not
+ * provide any guarantees on thread-safety. This is sufficient for now because current
+ * supported Kafka connectors guarantee no more than 1 concurrent async pending offset
+ * commit.
+ */
+ private transient KafkaCommitCallback offsetCommitCallback;
+
+ // ------------------------------------------------------------------------
/**
* Base constructor.
@@ -504,6 +524,23 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
throw new Exception("The partitions were not set for the consumer");
}
+ // initialize commit metrics and default offset callback method
+ this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
+ this.failedCommits = this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
+
+ this.offsetCommitCallback = new KafkaCommitCallback() {
+ @Override
+ public void onSuccess() {
+ successfulCommits.inc();
+ }
+
+ @Override
+ public void onException(Throwable cause) {
+ LOG.error("Async Kafka commit failed.", cause);
+ failedCommits.inc();
+ }
+ };
+
// mark the subtask as temporarily idle if there are no initial seed partitions;
// once this subtask discovers some partitions and starts collecting records, the subtask's
// status will automatically be triggered back to be active.
@@ -784,7 +821,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
LOG.debug("Checkpoint state was empty.");
return;
}
- fetcher.commitInternalOffsetsToKafka(offsets);
+
+ fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
} catch (Exception e) {
if (running) {
throw e;
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3bed0b8..7314e14 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -237,9 +237,10 @@ public abstract class AbstractFetcher<T, KPH> {
* committing them, so that committed offsets to Kafka represent "the next record to process".
*
* @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+ * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
* @throws Exception This method forwards exceptions.
*/
- public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+ public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
/**
* Creates the Kafka version specific representation of the given
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
new file mode 100644
index 0000000..aca7ae5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
+ * which should normally be triggered from checkpoint complete event.
+ */
+public interface KafkaCommitCallback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of commit request completion.
+ * This method will be called when the commit request sent to the server has been acknowledged without error.
+ */
+ void onSuccess();
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of commit request failure.
+ * This method will be called when the commit request failed.
+ * @param cause Kafka commit failure cause returned by kafka client
+ */
+ void onException(Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index ad08c4d..34158fd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
@@ -468,7 +469,7 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 1
consumer.notifyCheckpointComplete(138L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
// checkpoint 3
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -485,10 +486,10 @@ public class FlinkKafkaConsumerBaseTest {
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
consumer.notifyCheckpointComplete(666); // invalid checkpoint
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
listState = new TestingListState<>();
@@ -503,15 +504,15 @@ public class FlinkKafkaConsumerBaseTest {
// commit only the second last
consumer.notifyCheckpointComplete(598);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
// access invalid checkpoint
consumer.notifyCheckpointComplete(590);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
// and the last
consumer.notifyCheckpointComplete(599);
- verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+ verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 768df0f..8699247 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -361,7 +361,7 @@ public class AbstractFetcherTest {
}
@Override
- public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+ public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
throw new UnsupportedOperationException();
}
}