You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 13:53:34 UTC

[4/7] flink git commit: [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer

[FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8828648b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8828648b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8828648b

Branch: refs/heads/master
Commit: 8828648b4f377a97a9aac42365297479054a5ef0
Parents: c5221c8
Author: Zhenzhong Xu <zx...@netflix.com>
Authored: Mon Jun 26 15:51:09 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 24 ++++++++++++
 .../kafka/internal/Kafka010FetcherTest.java     |  6 +--
 .../kafka/internals/Kafka08Fetcher.java         |  8 +++-
 .../kafka/internal/Kafka09Fetcher.java          |  5 ++-
 .../kafka/internal/KafkaConsumerThread.java     | 27 ++++++++++++-
 .../kafka/internal/Kafka09FetcherTest.java      |  6 +--
 .../kafka/FlinkKafkaConsumerBase.java           | 40 +++++++++++++++++++-
 .../kafka/internals/AbstractFetcher.java        |  3 +-
 .../kafka/internals/KafkaCommitCallback.java    | 39 +++++++++++++++++++
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 13 ++++---
 .../kafka/internals/AbstractFetcherTest.java    |  2 +-
 11 files changed, 154 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 06ed9ef..b8f4acc 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -896,6 +896,30 @@ Thus, in order to infer the metric identifier:
   </tbody>
 </table>
 
+#### Connectors:
+
+##### Kafka Connectors
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 30%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>commitsSucceeded</td>
+      <td>Kafka offset commit success count if Kafka commit is turned on and checkpointing is enabled.</td>
+    </tr>
+    <tr>
+       <th rowspan="1">Operator</th>
+       <td>commitsFailed</td>
+       <td>Kafka offset commit failure count if Kafka commit is turned on and checkpointing is enabled.</td>
+    </tr>
+  </tbody>
+</table>
 
 ### Latency tracking
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index 2ea1622..22e183d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -157,7 +157,7 @@ public class Kafka010FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -285,7 +285,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -302,7 +302,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index aa7649c..6e2a1d4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -346,15 +346,21 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
 			try {
 				// the ZK handler takes care of incrementing the offsets by 1 before committing
 				zkHandler.prepareAndCommitOffsets(offsets);
+				if (commitCallback != null) {
+					commitCallback.onSuccess();
+				}
 			}
 			catch (Exception e) {
 				if (running) {
+					if (commitCallback != null) {
+						commitCallback.onException(e);
+					}
 					throw e;
 				} else {
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 68f54ed..db7d63b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -209,7 +210,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
 		@SuppressWarnings("unchecked")
 		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
 
@@ -228,6 +229,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 		}
 
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		consumerThread.setOffsetsToCommit(offsetsToCommit);
+		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index de8fb0b..6ff82d7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
@@ -107,6 +108,8 @@ public class KafkaConsumerThread extends Thread {
 	/** Flag tracking whether the latest commit request has completed. */
 	private volatile boolean commitInProgress;
 
+	/** User callback to be invoked when commits completed. */
+	private volatile KafkaCommitCallback callerCommitCallback;
 	public KafkaConsumerThread(
 			Logger log,
 			Handover handover,
@@ -308,17 +311,33 @@ public class KafkaConsumerThread extends Thread {
 	 *
 	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
 	 * the frequency with which this method is called, then some commits may be skipped due to being
-	 * superseded  by newer ones.
+	 * superseded by newer ones.
 	 *
 	 * @param offsetsToCommit The offsets to commit
 	 */
 	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+		setOffsetsToCommit(offsetsToCommit, null);
+	}
+
+	/**
+	 * Tells this thread to commit a set of offsets. This method does not block, the committing
+	 * operation will happen asynchronously.
+	 *
+	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+	 * the frequency with which this method is called, then some commits may be skipped due to being
+	 * superseded by newer ones.
+	 *
+	 * @param offsetsToCommit The offsets to commit
+	 * @param commitCallback callback when kafka commit completes
+	 */
+	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
 			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
 					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
 					"This does not compromise Flink's checkpoint integrity.");
 		}
+		this.callerCommitCallback = commitCallback;
 
 		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
 		handover.wakeupProducer();
@@ -482,6 +501,12 @@ public class KafkaConsumerThread extends Thread {
 
 			if (ex != null) {
 				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+				if (callerCommitCallback != null) {
+					callerCommitCallback.onException(ex);
+				}
+			}
+			else if (callerCommitCallback != null) {
+				callerCommitCallback.onSuccess();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index 649092b..f9ec204 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -157,7 +157,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -284,7 +284,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -301,7 +301,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index c331bf0..f3c9e5e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -42,6 +43,7 @@ import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -186,6 +188,24 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
+	//  internal metrics
+	// ------------------------------------------------------------------------
+
+	/** Counter for successful Kafka offset commits. */
+	private transient Counter successfulCommits;
+
+	/** Counter for failed Kafka offset commits. */
+	private transient Counter failedCommits;
+
+	/** Callback interface that will be invoked upon async Kafka commit completion.
+	 *  Please be aware that default callback implementation in base class does not
+	 *  provide any guarantees on thread-safety. This is sufficient for now because current
+	 *  supported Kafka connectors guarantee no more than 1 concurrent async pending offset
+	 *  commit.
+	 */
+	private transient KafkaCommitCallback offsetCommitCallback;
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Base constructor.
@@ -504,6 +524,23 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			throw new Exception("The partitions were not set for the consumer");
 		}
 
+		// initialize commit metrics and default offset callback method
+		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
+		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
+
+		this.offsetCommitCallback = new KafkaCommitCallback() {
+			@Override
+			public void onSuccess() {
+				successfulCommits.inc();
+			}
+
+			@Override
+			public void onException(Throwable cause) {
+				LOG.error("Async Kafka commit failed.", cause);
+				failedCommits.inc();
+			}
+		};
+
 		// mark the subtask as temporarily idle if there are no initial seed partitions;
 		// once this subtask discovers some partitions and starts collecting records, the subtask's
 		// status will automatically be triggered back to be active.
@@ -784,7 +821,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					LOG.debug("Checkpoint state was empty.");
 					return;
 				}
-				fetcher.commitInternalOffsetsToKafka(offsets);
+
+				fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
 			} catch (Exception e) {
 				if (running) {
 					throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3bed0b8..7314e14 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -237,9 +237,10 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+	 * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
 
 	/**
 	 * Creates the Kafka version specific representation of the given

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
new file mode 100644
index 0000000..aca7ae5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
+ * which should normally be triggered from checkpoint complete event.
+ */
+public interface KafkaCommitCallback {
+
+	/**
+	 * A callback method the user can implement to provide asynchronous handling of commit request completion.
+	 * This method will be called when the commit request sent to the server has been acknowledged without error.
+	 */
+	void onSuccess();
+
+	/**
+	 * A callback method the user can implement to provide asynchronous handling of commit request failure.
+	 * This method will be called when the commit request failed.
+	 * @param cause Kafka commit failure cause returned by kafka client
+	 */
+	void onException(Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index ad08c4d..34158fd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
@@ -468,7 +469,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -485,10 +486,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		listState = new TestingListState<>();
@@ -503,15 +504,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 768df0f..8699247 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -361,7 +361,7 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}