You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 14:49:04 UTC

[1/3] flink git commit: [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer

Repository: flink
Updated Branches:
  refs/heads/release-1.3 09caa9ffd -> 452f5d103


[FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1a173ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1a173ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1a173ad

Branch: refs/heads/release-1.3
Commit: f1a173addd99e5df00921b924352a39810d8d180
Parents: 09caa9f
Author: Zhenzhong Xu <zx...@netflix.com>
Authored: Mon Jun 26 15:51:09 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:04:20 2017 +0800

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 24 ++++++++++++
 .../connectors/kafka/Kafka010FetcherTest.java   |  6 +--
 .../kafka/internals/Kafka08Fetcher.java         |  8 +++-
 .../kafka/internal/Kafka09Fetcher.java          |  5 ++-
 .../kafka/internal/KafkaConsumerThread.java     | 30 +++++++++++++++
 .../connectors/kafka/Kafka09FetcherTest.java    |  6 +--
 .../kafka/FlinkKafkaConsumerBase.java           | 40 +++++++++++++++++++-
 .../kafka/internals/AbstractFetcher.java        |  5 ++-
 .../kafka/internals/KafkaCommitCallback.java    | 39 +++++++++++++++++++
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 13 ++++---
 .../kafka/internals/AbstractFetcherTest.java    |  2 +-
 11 files changed, 159 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index fc7b595..8b2f26d 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -867,6 +867,30 @@ Thus, in order to infer the metric identifier:
   </tbody>
 </table>
 
+#### Connectors:
+
+##### Kafka Connectors
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 30%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>commitsSucceeded</td>
+      <td>Kafka offset commit success count if Kafka commit is turned on and checkpointing is enabled.</td>
+    </tr>
+    <tr>
+       <th rowspan="1">Operator</th>
+       <td>commitsFailed</td>
+       <td>Kafka offset commit failure count if Kafka commit is turned on and checkpointing is enabled.</td>
+    </tr>
+  </tbody>
+</table>
 
 ### Latency tracking
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 2d0551d..f00f513 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -163,7 +163,7 @@ public class Kafka010FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -291,7 +291,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +308,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index de201e5..04cafd9 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -356,15 +356,21 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
 			try {
 				// the ZK handler takes care of incrementing the offsets by 1 before committing
 				zkHandler.prepareAndCommitOffsets(offsets);
+				if (commitCallback != null) {
+					commitCallback.onSuccess();
+				}
 			}
 			catch (Exception e) {
 				if (running) {
+					if (commitCallback != null) {
+						commitCallback.onException(e);
+					}
 					throw e;
 				} else {
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 1c87542..96e0b91 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -211,7 +212,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
 		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates();
 		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
 
@@ -228,6 +229,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 		}
 
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		consumerThread.setOffsetsToCommit(offsetsToCommit);
+		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index cbe1551..ae0db85 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
@@ -92,6 +93,8 @@ public class KafkaConsumerThread extends Thread {
 	/** Flag tracking whether the latest commit request has completed */
 	private volatile boolean commitInProgress;
 
+	/** User callback to be invoked when commits completed. */
+	private volatile KafkaCommitCallback callerCommitCallback;
 
 	public KafkaConsumerThread(
 			Logger log,
@@ -283,17 +286,38 @@ public class KafkaConsumerThread extends Thread {
 	 * 
 	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
 	 * the frequency with which this method is called, then some commits may be skipped due to being
+<<<<<<< HEAD
 	 * superseded  by newer ones.
 	 * 
+=======
+	 * superseded by newer ones.
+	 *
+>>>>>>> 8828648b4f... [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer
 	 * @param offsetsToCommit The offsets to commit
 	 */
 	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+		setOffsetsToCommit(offsetsToCommit, null);
+	}
+
+	/**
+	 * Tells this thread to commit a set of offsets. This method does not block, the committing
+	 * operation will happen asynchronously.
+	 *
+	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+	 * the frequency with which this method is called, then some commits may be skipped due to being
+	 * superseded by newer ones.
+	 *
+	 * @param offsetsToCommit The offsets to commit
+	 * @param commitCallback callback when kafka commit completes
+	 */
+	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
 			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
 					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
 					"This does not compromise Flink's checkpoint integrity.");
 		}
+		this.callerCommitCallback = commitCallback;
 
 		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
 		handover.wakeupProducer();
@@ -324,6 +348,12 @@ public class KafkaConsumerThread extends Thread {
 
 			if (ex != null) {
 				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+				if (callerCommitCallback != null) {
+					callerCommitCallback.onException(ex);
+				}
+			}
+			else if (callerCommitCallback != null) {
+				callerCommitCallback.onSuccess();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 6e13db2..63885aa 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -163,7 +163,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -291,7 +291,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +308,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5a440e0..6f2e538 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -41,6 +42,7 @@ import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -146,6 +148,24 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	private transient boolean restored = false;
 
 	// ------------------------------------------------------------------------
+	//  internal metrics
+	// ------------------------------------------------------------------------
+
+	/** Counter for successful Kafka offset commits. */
+	private transient Counter successfulCommits;
+
+	/** Counter for failed Kafka offset commits. */
+	private transient Counter failedCommits;
+
+	/** Callback interface that will be invoked upon async Kafka commit completion.
+	 *  Please be aware that default callback implementation in base class does not
+	 *  provide any guarantees on thread-safety. This is sufficient for now because current
+	 *  supported Kafka connectors guarantee no more than 1 concurrent async pending offset
+	 *  commit.
+	 */
+	private transient KafkaCommitCallback offsetCommitCallback;
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Base constructor.
@@ -421,6 +441,23 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			throw new Exception("The partitions were not set for the consumer");
 		}
 
+		// initialize commit metrics and default offset callback method
+		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
+		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
+
+		this.offsetCommitCallback = new KafkaCommitCallback() {
+			@Override
+			public void onSuccess() {
+				successfulCommits.inc();
+			}
+
+			@Override
+			public void onException(Throwable cause) {
+				LOG.error("Async Kafka commit failed.", cause);
+				failedCommits.inc();
+			}
+		};
+
 		// we need only do work, if we actually have partitions assigned
 		if (!subscribedPartitionsToStartOffsets.isEmpty()) {
 
@@ -623,7 +660,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					LOG.debug("Checkpoint state was empty.");
 					return;
 				}
-				fetcher.commitInternalOffsetsToKafka(offsets);
+
+				fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
 			} catch (Exception e) {
 				if (running) {
 					throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 0b311a9..a553ed6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -176,10 +176,11 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+	 * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
-	
+	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
+
 	// ------------------------------------------------------------------------
 	//  snapshot and restore the state
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
new file mode 100644
index 0000000..aca7ae5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
+ * which should normally be triggered from checkpoint complete event.
+ */
+public interface KafkaCommitCallback {
+
+	/**
+	 * A callback method the user can implement to provide asynchronous handling of commit request completion.
+	 * This method will be called when the commit request sent to the server has been acknowledged without error.
+	 */
+	void onSuccess();
+
+	/**
+	 * A callback method the user can implement to provide asynchronous handling of commit request failure.
+	 * This method will be called when the commit request failed.
+	 * @param cause Kafka commit failure cause returned by kafka client
+	 */
+	void onException(Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 04508dc..f336b1c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
@@ -496,7 +497,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -513,11 +514,11 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		listState = new TestingListState<>();
@@ -532,15 +533,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/f1a173ad/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index c1a64c4..8612ca3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -358,7 +358,7 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}


[2/3] flink git commit: [FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks

Posted by tz...@apache.org.
[FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks

This closes #4187.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4382464f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4382464f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4382464f

Branch: refs/heads/release-1.3
Commit: 4382464fd140aae70767a3c9776cf27f3b57355b
Parents: f1a173a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jul 28 18:59:11 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:47:26 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   | 17 +++++----
 .../kafka/internals/Kafka08Fetcher.java         | 15 ++++----
 .../kafka/internal/Kafka09Fetcher.java          |  7 +++-
 .../kafka/internal/KafkaConsumerThread.java     | 39 ++++++--------------
 .../connectors/kafka/Kafka09FetcherTest.java    | 17 +++++----
 .../kafka/internals/AbstractFetcher.java        |  8 +++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 17 ++++++---
 .../kafka/internals/AbstractFetcherTest.java    |  5 ++-
 8 files changed, 64 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index f00f513..3ce12b3 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -163,7 +164,7 @@ public class Kafka010FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -291,7 +292,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +309,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -344,9 +345,9 @@ public class Kafka010FetcherTest {
 		final byte[] payload = new byte[] {1, 2, 3, 4};
 
 		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+				new ConsumerRecord<>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<>(topic, partition, 17, payload, payload));
 
 		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
 		data.put(new TopicPartition(topic, partition), records);
@@ -452,11 +453,11 @@ public class Kafka010FetcherTest {
 		@Override
 		public void close() {}
 
-		public void waitTillHasBlocker() throws InterruptedException {
+		void waitTillHasBlocker() throws InterruptedException {
 			inBlocking.await();
 		}
 
-		public boolean isStillBlocking() {
+		boolean isStillBlocking() {
 			return lock.isLocked();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 04cafd9..37bdb23 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -356,21 +358,20 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
+	public void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
 			try {
 				// the ZK handler takes care of incrementing the offsets by 1 before committing
 				zkHandler.prepareAndCommitOffsets(offsets);
-				if (commitCallback != null) {
-					commitCallback.onSuccess();
-				}
+				commitCallback.onSuccess();
 			}
 			catch (Exception e) {
 				if (running) {
-					if (commitCallback != null) {
-						commitCallback.onException(e);
-					}
+					commitCallback.onException(e);
 					throw e;
 				} else {
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 96e0b91..8bd61ed 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -212,7 +214,10 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
+	public void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
 		KafkaTopicPartitionState<TopicPartition>[] partitions = subscribedPartitionStates();
 		Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index ae0db85..56281a9 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.errors.WakeupException;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -94,7 +96,7 @@ public class KafkaConsumerThread extends Thread {
 	private volatile boolean commitInProgress;
 
 	/** User callback to be invoked when commits completed. */
-	private volatile KafkaCommitCallback callerCommitCallback;
+	private volatile KafkaCommitCallback offsetCommitCallback;
 
 	public KafkaConsumerThread(
 			Logger log,
@@ -286,38 +288,22 @@ public class KafkaConsumerThread extends Thread {
 	 * 
 	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
 	 * the frequency with which this method is called, then some commits may be skipped due to being
-<<<<<<< HEAD
-	 * superseded  by newer ones.
-	 * 
-=======
 	 * superseded by newer ones.
 	 *
->>>>>>> 8828648b4f... [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer
 	 * @param offsetsToCommit The offsets to commit
+	 * @param commitCallback callback when Kafka commit completes
 	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
-		setOffsetsToCommit(offsetsToCommit, null);
-	}
+	void setOffsetsToCommit(
+			Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+			@Nonnull KafkaCommitCallback commitCallback) {
 
-	/**
-	 * Tells this thread to commit a set of offsets. This method does not block, the committing
-	 * operation will happen asynchronously.
-	 *
-	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
-	 * the frequency with which this method is called, then some commits may be skipped due to being
-	 * superseded by newer ones.
-	 *
-	 * @param offsetsToCommit The offsets to commit
-	 * @param commitCallback callback when kafka commit completes
-	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
 			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
 					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
 					"This does not compromise Flink's checkpoint integrity.");
 		}
-		this.callerCommitCallback = commitCallback;
+		this.offsetCommitCallback = commitCallback;
 
 		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
 		handover.wakeupProducer();
@@ -348,12 +334,9 @@ public class KafkaConsumerThread extends Thread {
 
 			if (ex != null) {
 				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
-				if (callerCommitCallback != null) {
-					callerCommitCallback.onException(ex);
-				}
-			}
-			else if (callerCommitCallback != null) {
-				callerCommitCallback.onSuccess();
+				offsetCommitCallback.onException(ex);
+			} else {
+				offsetCommitCallback.onSuccess();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 63885aa..e61359f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internal.Handover;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -163,7 +164,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -291,7 +292,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -308,7 +309,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -344,9 +345,9 @@ public class Kafka09FetcherTest {
 		final byte[] payload = new byte[] {1, 2, 3, 4};
 
 		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+				new ConsumerRecord<>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<>(topic, partition, 17, payload, payload));
 
 		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
 		data.put(new TopicPartition(topic, partition), records);
@@ -453,11 +454,11 @@ public class Kafka09FetcherTest {
 		@Override
 		public void close() {}
 
-		public void waitTillHasBlocker() throws InterruptedException {
+		void waitTillHasBlocker() throws InterruptedException {
 			inBlocking.await();
 		}
 
-		public boolean isStillBlocking() {
+		boolean isStillBlocking() {
 			return lock.isLocked();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index a553ed6..10b7530 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -176,10 +178,12 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
-	 * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
+	 * @param commitCallback The callback that the user should trigger when a commit request completes or fails.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
+	public abstract void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception;
 
 	// ------------------------------------------------------------------------
 	//  snapshot and restore the state

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index f336b1c..c6af119 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -212,6 +213,10 @@ public class FlinkKafkaConsumerBaseTest {
 		partitionList.add(new KafkaTopicPartition("test", 0));
 		consumer.setSubscribedPartitions(partitionList);
 
+		StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+		when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class));
+		consumer.setRuntimeContext(runtimeContext);
+
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Serializable> listState = new TestingListState<>();
 		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
@@ -497,7 +502,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -514,11 +519,11 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		listState = new TestingListState<>();
@@ -533,15 +538,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/4382464f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 8612ca3..3a5f83c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
@@ -358,7 +359,9 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
+		public void commitInternalOffsetsToKafka(
+				Map<KafkaTopicPartition, Long> offsets,
+				@Nonnull KafkaCommitCallback callback) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}


[3/3] flink git commit: [FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

Posted by tz...@apache.org.
[FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

This closes #4414.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/452f5d10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/452f5d10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/452f5d10

Branch: refs/heads/release-1.3
Commit: 452f5d1032f61a29726dd484453a256c7d57d052
Parents: 4382464
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 27 18:31:13 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 22:47:46 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 21 ++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/452f5d10/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 1de8e6f..3467237 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -216,7 +216,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -242,8 +242,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -302,7 +303,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -327,8 +328,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -1611,9 +1613,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
 					env1.execute("Metrics test job");
-				} catch(Throwable t) {
-					LOG.warn("Got exception during execution", t);
-					if(!(t instanceof JobCancellationException)) { // we'll cancel the job
+				} catch (Throwable t) {
+					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
 				}
@@ -1664,11 +1666,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		} finally {
 			// cancel
 			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			// wait for the job to finish (it should due to the cancel command above)
+			jobThread.join();
 		}
 
-		while (jobThread.isAlive()) {
-			Thread.sleep(50);
-		}
 		if (error.f0 != null) {
 			throw error.f0;
 		}