You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/28 13:53:31 UTC

[1/7] flink git commit: [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase

Repository: flink
Updated Branches:
  refs/heads/master b1f37efb3 -> f0d4a772f


[hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase

This prevents concrete Kafka Source implementations from accidentally
overriding the checkpointing methods. This would be problematic when not
providing tests. We test the checkpoint methods of the ConsumerBase but
derived methods would not be tested.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68395cce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68395cce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68395cce

Branch: refs/heads/master
Commit: 68395cceb58f4936f33af269a73ae968b06e89cb
Parents: e111d77
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jul 18 10:35:54 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:29 2017 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68395cce/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index cdcb28b..c331bf0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -648,7 +648,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
+	public final void initializeState(FunctionInitializationContext context) throws Exception {
 
 		OperatorStateStore stateStore = context.getOperatorStateStore();
 
@@ -686,7 +686,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
 		if (!running) {
 			LOG.debug("snapshotState() called on closed source");
 		} else {
@@ -730,7 +730,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
+	public final void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
 		LOG.info("{} (taskIdx={}) restoring offsets from an older version: {}",
 			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets);
 
@@ -746,7 +746,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
 		if (!running) {
 			LOG.debug("notifyCheckpointComplete() called on closed source");
 			return;


[7/7] flink git commit: [FLINK-7248] [kafka, tests] Remove invalid checkRestoredNullCheckpointWhenFetcherNotReady test

Posted by tz...@apache.org.
[FLINK-7248] [kafka, tests] Remove invalid checkRestoredNullCheckpointWhenFetcherNotReady test

This test is an invalid remnant from recent major Kafka consumer
refactorings. The actual behaviour is covered by
checkRestoredCheckpointWhenFetcherNotReady. When the fetcher is not yet
ready and exposed and a checkpoint happens, we fallback to using any
restored state as the checkpoint.

This closes #4387.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5221c85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5221c85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5221c85

Branch: refs/heads/master
Commit: c5221c85a1c532a5234ec0b4331a04cffa04ba5c
Parents: 68395cc
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jul 24 16:34:37 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 49 ++++----------------
 1 file changed, 9 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5221c85/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index fef2820..ad08c4d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -137,15 +137,16 @@ public class FlinkKafkaConsumerBaseTest {
 	}
 
 	/**
-	 * Tests that no checkpoints happen when the fetcher is not running.
+	 * Tests that when taking a checkpoint when the fetcher is not running yet,
+	 * the checkpoint correctly contains the restored state instead.
 	 */
 	@Test
 	public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 
-		TestingListState<Serializable> listState = new TestingListState<>();
-		listState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
-		listState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
+		TestingListState<Serializable> restoredListState = new TestingListState<>();
+		restoredListState.add(Tuple2.of(new KafkaTopicPartition("abc", 13), 16768L));
+		restoredListState.add(Tuple2.of(new KafkaTopicPartition("def", 7), 987654321L));
 
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
 		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
@@ -156,7 +157,7 @@ public class FlinkKafkaConsumerBaseTest {
 		// mock old 1.2 state (empty)
 		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
 		// mock 1.3 state
-		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(restoredListState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
 
@@ -171,17 +172,17 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ensure that the list was cleared and refilled. while this is an implementation detail, we use it here
 		// to figure out that snapshotState() actually did something.
-		Assert.assertTrue(listState.isClearCalled());
+		Assert.assertTrue(restoredListState.isClearCalled());
 
 		Set<Serializable> expected = new HashSet<>();
 
-		for (Serializable serializable : listState.get()) {
+		for (Serializable serializable : restoredListState.get()) {
 			expected.add(serializable);
 		}
 
 		int counter = 0;
 
-		for (Serializable serializable : listState.get()) {
+		for (Serializable serializable : restoredListState.get()) {
 			assertTrue(expected.contains(serializable));
 			counter++;
 		}
@@ -189,38 +190,6 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(expected.size(), counter);
 	}
 
-	/**
-	 * Tests that no checkpoints happen when the fetcher is not running.
-	 */
-	@Test
-	public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
-		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
-		StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
-		when(runtimeContext.getIndexOfThisSubtask()).thenReturn(0);
-		when(runtimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-		consumer.setRuntimeContext(runtimeContext);
-
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-		TestingListState<Serializable> listState = new TestingListState<>();
-		// mock old 1.2 state (empty)
-		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(new TestingListState<Serializable>());
-		// mock 1.3 state
-		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
-
-		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
-		when(initializationContext.isRestored()).thenReturn(false);
-
-		consumer.initializeState(initializationContext);
-
-		consumer.open(new Configuration());
-
-		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(17, 17));
-
-		assertFalse(listState.get().iterator().hasNext());
-	}
-
 	@Test
 	public void testConfigureOnCheckpointsCommitMode() throws Exception {
 


[2/7] flink git commit: [FLINK-7143] [kafka] Add test for Kafka Consumer rescaling

Posted by tz...@apache.org.
[FLINK-7143] [kafka] Add test for Kafka Consumer rescaling

This verifies that the consumer always correctly knows whether it is
restored or not and is not affected by changes in the partitions as
reported by Kafka.

Previously, operator state reshuffling could lead to partitions being
subscribed to multiple times.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e111d773
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e111d773
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e111d773

Branch: refs/heads/master
Commit: e111d7730ec6032dc14579bd274e7822f7176e39
Parents: 888fabe
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jul 18 11:57:46 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:29 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 169 ++++++++++++++++++-
 .../AbstractPartitionDiscovererTest.java        | 115 ++-----------
 .../testutils/TestPartitionDiscoverer.java      | 125 ++++++++++++++
 3 files changed, 307 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e111d773/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index e0508ce..fef2820 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -25,15 +25,22 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.commons.collections.map.LinkedMap;
@@ -47,14 +54,21 @@ import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.collection.IsIn.isIn;
+import static org.hamcrest.collection.IsMapContaining.hasKey;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyMap;
@@ -181,6 +195,10 @@ public class FlinkKafkaConsumerBaseTest {
 	@Test
 	public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+		StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+		when(runtimeContext.getIndexOfThisSubtask()).thenReturn(0);
+		when(runtimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
+		consumer.setRuntimeContext(runtimeContext);
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Serializable> listState = new TestingListState<>();
@@ -299,6 +317,8 @@ public class FlinkKafkaConsumerBaseTest {
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
 		StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
 		when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing
+		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
+		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
 		consumer.setRuntimeContext(mockRuntimeContext);
 
 		assertEquals(0, pendingOffsetsToCommit.size());
@@ -426,6 +446,8 @@ public class FlinkKafkaConsumerBaseTest {
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingOffsetsToCommit, true);
 		StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
 		when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing
+		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
+		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
 		consumer.setRuntimeContext(mockRuntimeContext);
 
 		consumer.setCommitOffsetsOnCheckpoints(false); // disable offset committing
@@ -523,6 +545,120 @@ public class FlinkKafkaConsumerBaseTest {
 		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
 	}
 
+	@Test
+	public void testScaleUp() throws Exception {
+		testRescaling(5, 2, 15, 1000);
+	}
+
+	@Test
+	public void testScaleDown() throws Exception {
+		testRescaling(5, 10, 2, 100);
+	}
+
+	/**
+	 * Tests whether the Kafka consumer behaves correctly when scaling the parallelism up/down,
+	 * which means that operator state is being reshuffled.
+	 *
+	 * <p>This also verifies that a restoring source is always impervious to changes in the list
+	 * of topics fetched from Kafka.
+	 */
+	@SuppressWarnings("unchecked")
+	void testRescaling(
+		final int initialParallelism,
+		final int numPartitions,
+		final int restoredParallelism,
+		final int restoredNumPartitions) throws Exception {
+
+		Preconditions.checkArgument(
+			restoredNumPartitions >= numPartitions,
+			"invalid test case for Kafka repartitioning; Kafka only allows increasing partitions.");
+
+		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = new ArrayList<>();
+		for (int i = 0; i < numPartitions; i++) {
+			mockFetchedPartitionsOnStartup.add(new KafkaTopicPartition("test-topic", i));
+		}
+
+		DummyFlinkKafkaConsumer<String>[] consumers =
+			new DummyFlinkKafkaConsumer[initialParallelism];
+
+		AbstractStreamOperatorTestHarness<String>[] testHarnesses =
+			new AbstractStreamOperatorTestHarness[initialParallelism];
+
+		for (int i = 0; i < initialParallelism; i++) {
+			consumers[i] = new DummyFlinkKafkaConsumer<>(
+				Collections.singletonList("test-topic"), mockFetchedPartitionsOnStartup);
+			testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i);
+
+			// initializeState() is always called, null signals that we didn't restore
+			testHarnesses[i].initializeState(null);
+			testHarnesses[i].open();
+		}
+
+		Map<KafkaTopicPartition, Long> globalSubscribedPartitions = new HashMap<>();
+
+		for (int i = 0; i < initialParallelism; i++) {
+			Map<KafkaTopicPartition, Long> subscribedPartitions =
+				consumers[i].getSubscribedPartitionsToStartOffsets();
+
+			// make sure that no one else is subscribed to these partitions
+			for (KafkaTopicPartition partition : subscribedPartitions.keySet()) {
+				assertThat(globalSubscribedPartitions, not(hasKey(partition)));
+			}
+			globalSubscribedPartitions.putAll(subscribedPartitions);
+		}
+
+		assertThat(globalSubscribedPartitions.values(), hasSize(numPartitions));
+		assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(globalSubscribedPartitions.keySet())));
+
+		OperatorStateHandles[] state = new OperatorStateHandles[initialParallelism];
+
+		for (int i = 0; i < initialParallelism; i++) {
+			state[i] = testHarnesses[i].snapshot(0, 0);
+		}
+
+		OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness.repackageState(state);
+
+		// -----------------------------------------------------------------------------------------
+		// restore
+
+		List<KafkaTopicPartition> mockFetchedPartitionsAfterRestore = new ArrayList<>();
+		for (int i = 0; i < restoredNumPartitions; i++) {
+			mockFetchedPartitionsAfterRestore.add(new KafkaTopicPartition("test-topic", i));
+		}
+
+		DummyFlinkKafkaConsumer<String>[] restoredConsumers =
+			new DummyFlinkKafkaConsumer[restoredParallelism];
+
+		AbstractStreamOperatorTestHarness<String>[] restoredTestHarnesses =
+			new AbstractStreamOperatorTestHarness[restoredParallelism];
+
+		for (int i = 0; i < restoredParallelism; i++) {
+			restoredConsumers[i] = new DummyFlinkKafkaConsumer<>(
+				Collections.singletonList("test-topic"), mockFetchedPartitionsAfterRestore);
+			restoredTestHarnesses[i] = createTestHarness(restoredConsumers[i], restoredParallelism, i);
+
+			// initializeState() is always called, null signals that we didn't restore
+			restoredTestHarnesses[i].initializeState(mergedState);
+			restoredTestHarnesses[i].open();
+		}
+
+		Map<KafkaTopicPartition, Long> restoredGlobalSubscribedPartitions = new HashMap<>();
+
+		for (int i = 0; i < restoredParallelism; i++) {
+			Map<KafkaTopicPartition, Long> subscribedPartitions =
+				restoredConsumers[i].getSubscribedPartitionsToStartOffsets();
+
+			// make sure that no one else is subscribed to these partitions
+			for (KafkaTopicPartition partition : subscribedPartitions.keySet()) {
+				assertThat(restoredGlobalSubscribedPartitions, not(hasKey(partition)));
+			}
+			restoredGlobalSubscribedPartitions.putAll(subscribedPartitions);
+		}
+
+		assertThat(restoredGlobalSubscribedPartitions.values(), hasSize(restoredNumPartitions));
+		assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(restoredGlobalSubscribedPartitions.keySet())));
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
@@ -547,6 +683,19 @@ public class FlinkKafkaConsumerBaseTest {
 		return consumer;
 	}
 
+	private static <T> AbstractStreamOperatorTestHarness<T> createTestHarness(
+		SourceFunction<T> source, int numSubtasks, int subtaskIndex) throws Exception {
+
+		AbstractStreamOperatorTestHarness<T> testHarness =
+			new AbstractStreamOperatorTestHarness<>(
+				new StreamSource<>(source), Short.MAX_VALUE / 2, numSubtasks, subtaskIndex);
+
+		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		return testHarness;
+	}
+
+
 	// ------------------------------------------------------------------------
 
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
@@ -554,9 +703,20 @@ public class FlinkKafkaConsumerBaseTest {
 
 		boolean isAutoCommitEnabled = false;
 
-		@SuppressWarnings("unchecked")
+		private List<String> fixedMockGetAllTopicsReturnSequence;
+		private List<KafkaTopicPartition> fixedMockGetAllPartitionsForTopicsReturnSequence;
+
 		public DummyFlinkKafkaConsumer() {
+			this(Collections.singletonList("dummy-topic"), Collections.singletonList(new KafkaTopicPartition("dummy-topic", 0)));
+		}
+
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaConsumer(
+				List<String> fixedMockGetAllTopicsReturnSequence,
+				List<KafkaTopicPartition> fixedMockGetAllPartitionsForTopicsReturnSequence) {
 			super(Arrays.asList("dummy-topic"), null, (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), 0);
+			this.fixedMockGetAllTopicsReturnSequence = Preconditions.checkNotNull(fixedMockGetAllTopicsReturnSequence);
+			this.fixedMockGetAllPartitionsForTopicsReturnSequence = Preconditions.checkNotNull(fixedMockGetAllPartitionsForTopicsReturnSequence);
 		}
 
 		@Override
@@ -576,7 +736,12 @@ public class FlinkKafkaConsumerBaseTest {
 				KafkaTopicsDescriptor topicsDescriptor,
 				int indexOfThisSubtask,
 				int numParallelSubtasks) {
-			return mock(AbstractPartitionDiscoverer.class);
+			return new TestPartitionDiscoverer(
+				topicsDescriptor,
+				indexOfThisSubtask,
+				numParallelSubtasks,
+				TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(fixedMockGetAllTopicsReturnSequence),
+				TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(fixedMockGetAllPartitionsForTopicsReturnSequence));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e111d773/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
index 4d3e542..2633b95 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
+import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,9 +36,6 @@ import java.util.regex.Pattern;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests that the partition assignment in the partition discoverer is
@@ -82,8 +79,8 @@ public class AbstractPartitionDiscovererTest {
 					topicsDescriptor,
 					subtaskIndex,
 					mockGetAllPartitionsForTopicsReturn.size(),
-					createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
-					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+					TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
+					TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
 			partitionDiscoverer.open();
 
 			List<KafkaTopicPartition> initialDiscovery = partitionDiscoverer.discoverPartitions();
@@ -127,8 +124,8 @@ public class AbstractPartitionDiscovererTest {
 						topicsDescriptor,
 						subtaskIndex,
 						numConsumers,
-						createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
-						createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+						TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
+						TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
 				partitionDiscoverer.open();
 
 				List<KafkaTopicPartition> initialDiscovery = partitionDiscoverer.discoverPartitions();
@@ -179,8 +176,8 @@ public class AbstractPartitionDiscovererTest {
 						topicsDescriptor,
 						subtaskIndex,
 						numConsumers,
-						createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
-						createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+						TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
+						TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
 				partitionDiscoverer.open();
 
 				List<KafkaTopicPartition> initialDiscovery = partitionDiscoverer.discoverPartitions();
@@ -240,7 +237,7 @@ public class AbstractPartitionDiscovererTest {
 					topicsDescriptor,
 					0,
 					numConsumers,
-					createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
+					TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 					deepClone(mockGetAllPartitionsForTopicsReturnSequence));
 			partitionDiscovererSubtask0.open();
 
@@ -248,7 +245,7 @@ public class AbstractPartitionDiscovererTest {
 					topicsDescriptor,
 					1,
 					numConsumers,
-					createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
+					TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 					deepClone(mockGetAllPartitionsForTopicsReturnSequence));
 			partitionDiscovererSubtask1.open();
 
@@ -256,7 +253,7 @@ public class AbstractPartitionDiscovererTest {
 					topicsDescriptor,
 					2,
 					numConsumers,
-					createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
+					TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Collections.singletonList(TEST_TOPIC)),
 					deepClone(mockGetAllPartitionsForTopicsReturnSequence));
 			partitionDiscovererSubtask2.open();
 
@@ -375,16 +372,16 @@ public class AbstractPartitionDiscovererTest {
 					topicsDescriptor,
 					subtaskIndex,
 					numSubtasks,
-					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
-					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+					TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
+					TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
 			partitionDiscoverer.open();
 
 			TestPartitionDiscoverer partitionDiscovererOutOfOrder = new TestPartitionDiscoverer(
 					topicsDescriptor,
 					subtaskIndex,
 					numSubtasks,
-					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
-					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturnOutOfOrder));
+					TestPartitionDiscoverer.createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
+					TestPartitionDiscoverer.createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturnOutOfOrder));
 			partitionDiscovererOutOfOrder.open();
 
 			List<KafkaTopicPartition> discoveredPartitions = partitionDiscoverer.discoverPartitions();
@@ -397,88 +394,6 @@ public class AbstractPartitionDiscovererTest {
 		}
 	}
 
-	private static class TestPartitionDiscoverer extends AbstractPartitionDiscoverer {
-
-		private final KafkaTopicsDescriptor topicsDescriptor;
-
-		private final List<List<String>> mockGetAllTopicsReturnSequence;
-		private final List<List<KafkaTopicPartition>> mockGetAllPartitionsForTopicsReturnSequence;
-
-		private int getAllTopicsInvokeCount = 0;
-		private int getAllPartitionsForTopicsInvokeCount = 0;
-
-		public TestPartitionDiscoverer(
-				KafkaTopicsDescriptor topicsDescriptor,
-				int indexOfThisSubtask,
-				int numParallelSubtasks,
-				List<List<String>> mockGetAllTopicsReturnSequence,
-				List<List<KafkaTopicPartition>> mockGetAllPartitionsForTopicsReturnSequence) {
-
-			super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
-
-			this.topicsDescriptor = topicsDescriptor;
-			this.mockGetAllTopicsReturnSequence = mockGetAllTopicsReturnSequence;
-			this.mockGetAllPartitionsForTopicsReturnSequence = mockGetAllPartitionsForTopicsReturnSequence;
-		}
-
-		@Override
-		protected List<String> getAllTopics() {
-			assertTrue(topicsDescriptor.isTopicPattern());
-			return mockGetAllTopicsReturnSequence.get(getAllTopicsInvokeCount++);
-		}
-
-		@Override
-		protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) {
-			if (topicsDescriptor.isFixedTopics()) {
-				assertEquals(topicsDescriptor.getFixedTopics(), topics);
-			} else {
-				assertEquals(mockGetAllTopicsReturnSequence.get(getAllPartitionsForTopicsInvokeCount - 1), topics);
-			}
-			return mockGetAllPartitionsForTopicsReturnSequence.get(getAllPartitionsForTopicsInvokeCount++);
-		}
-
-		@Override
-		protected void initializeConnections() {
-			// nothing to do
-		}
-
-		@Override
-		protected void wakeupConnections() {
-			// nothing to do
-		}
-
-		@Override
-		protected void closeConnections() {
-			// nothing to do
-		}
-	}
-
-	private static List<List<String>> createMockGetAllTopicsSequenceFromFixedReturn(final List<String> fixed) {
-		@SuppressWarnings("unchecked")
-		List<List<String>> mockSequence = mock(List.class);
-		when(mockSequence.get(anyInt())).thenAnswer(new Answer<List<String>>() {
-			@Override
-			public List<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				return new ArrayList<>(fixed);
-			}
-		});
-
-		return mockSequence;
-	}
-
-	private static List<List<KafkaTopicPartition>> createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(final List<KafkaTopicPartition> fixed) {
-		@SuppressWarnings("unchecked")
-		List<List<KafkaTopicPartition>> mockSequence = mock(List.class);
-		when(mockSequence.get(anyInt())).thenAnswer(new Answer<List<KafkaTopicPartition>>() {
-			@Override
-			public List<KafkaTopicPartition> answer(InvocationOnMock invocationOnMock) throws Throwable {
-				return new ArrayList<>(fixed);
-			}
-		});
-
-		return mockSequence;
-	}
-
 	private boolean contains(List<KafkaTopicPartition> partitions, int partition) {
 		for (KafkaTopicPartition ktp : partitions) {
 			if (ktp.getPartition() == partition) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e111d773/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestPartitionDiscoverer.java
new file mode 100644
index 0000000..1f7c031
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/TestPartitionDiscoverer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Utility {@link AbstractPartitionDiscoverer} for tests that allows
+ * mocking the sequence of consecutive metadata fetch calls to Kafka.
+ */
+public class TestPartitionDiscoverer extends AbstractPartitionDiscoverer {
+
+	private final KafkaTopicsDescriptor topicsDescriptor;
+
+	private final List<List<String>> mockGetAllTopicsReturnSequence;
+	private final List<List<KafkaTopicPartition>> mockGetAllPartitionsForTopicsReturnSequence;
+
+	private int getAllTopicsInvokeCount = 0;
+	private int getAllPartitionsForTopicsInvokeCount = 0;
+
+	public TestPartitionDiscoverer(
+			KafkaTopicsDescriptor topicsDescriptor,
+			int indexOfThisSubtask,
+			int numParallelSubtasks,
+			List<List<String>> mockGetAllTopicsReturnSequence,
+			List<List<KafkaTopicPartition>> mockGetAllPartitionsForTopicsReturnSequence) {
+
+		super(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks);
+
+		this.topicsDescriptor = topicsDescriptor;
+		this.mockGetAllTopicsReturnSequence = mockGetAllTopicsReturnSequence;
+		this.mockGetAllPartitionsForTopicsReturnSequence = mockGetAllPartitionsForTopicsReturnSequence;
+	}
+
+	@Override
+	protected List<String> getAllTopics() {
+		assertTrue(topicsDescriptor.isTopicPattern());
+		return mockGetAllTopicsReturnSequence.get(getAllTopicsInvokeCount++);
+	}
+
+	@Override
+	protected List<KafkaTopicPartition> getAllPartitionsForTopics(List<String> topics) {
+		if (topicsDescriptor.isFixedTopics()) {
+			assertEquals(topicsDescriptor.getFixedTopics(), topics);
+		} else {
+			assertEquals(mockGetAllTopicsReturnSequence.get(getAllPartitionsForTopicsInvokeCount - 1), topics);
+		}
+		return mockGetAllPartitionsForTopicsReturnSequence.get(getAllPartitionsForTopicsInvokeCount++);
+	}
+
+	@Override
+	protected void initializeConnections() {
+		// nothing to do
+	}
+
+	@Override
+	protected void wakeupConnections() {
+		// nothing to do
+	}
+
+	@Override
+	protected void closeConnections() {
+		// nothing to do
+	}
+
+	// ---------------------------------------------------------------------------------
+	//  Utilities to create mocked, fixed results for a sequences of metadata fetches
+	// ---------------------------------------------------------------------------------
+
+	public static List<List<String>> createMockGetAllTopicsSequenceFromFixedReturn(final List<String> fixed) {
+		@SuppressWarnings("unchecked")
+		List<List<String>> mockSequence = mock(List.class);
+		when(mockSequence.get(anyInt())).thenAnswer(new Answer<List<String>>() {
+			@Override
+			public List<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return new ArrayList<>(fixed);
+			}
+		});
+
+		return mockSequence;
+	}
+
+	public static List<List<KafkaTopicPartition>> createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(final List<KafkaTopicPartition> fixed) {
+		@SuppressWarnings("unchecked")
+		List<List<KafkaTopicPartition>> mockSequence = mock(List.class);
+		when(mockSequence.get(anyInt())).thenAnswer(new Answer<List<KafkaTopicPartition>>() {
+			@Override
+			public List<KafkaTopicPartition> answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return new ArrayList<>(fixed);
+			}
+		});
+
+		return mockSequence;
+	}
+}


[3/7] flink git commit: [FLINK-7143] [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts

Posted by tz...@apache.org.
[FLINK-7143] [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts

This commit refactors the local partition assignment logic to be located
in a strict contract-defining method, to make it explicit of the
expected partition to subtask assignment without relying solely on
hashCode's of kafka partitions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/888fabe9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/888fabe9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/888fabe9

Branch: refs/heads/master
Commit: 888fabe97cd15b6d92039a6f073cc92404d5b306
Parents: b1f37ef
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jul 24 15:00:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:29 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           |  9 ++--
 .../internals/AbstractPartitionDiscoverer.java  |  2 +-
 .../internals/KafkaTopicPartitionAssigner.java  | 56 ++++++++++++++++++++
 .../AbstractPartitionDiscovererTest.java        | 52 ++++++++++++------
 4 files changed, 98 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index fd5c96d..cdcb28b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -407,11 +408,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				if (!restoredFromOldState) {
 					// seed the partition discoverer with the union state while filtering out
 					// restored partitions that should not be subscribed by this subtask
-					if (AbstractPartitionDiscoverer.shouldAssignToThisSubtask(
-							restoredStateEntry.getKey(),
-							getRuntimeContext().getIndexOfThisSubtask(),
-							getRuntimeContext().getNumberOfParallelSubtasks())) {
-
+					if (KafkaTopicPartitionAssigner.assign(
+						restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
+							== getRuntimeContext().getIndexOfThisSubtask()){
 						subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
 					}
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index 725092e..39645be 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -198,7 +198,7 @@ public abstract class AbstractPartitionDiscoverer {
 		if (isUndiscoveredPartition(partition)) {
 			topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
 
-			return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
+			return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
 		}
 
 		return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
new file mode 100644
index 0000000..944630f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * Utility for assigning Kafka partitions to consumer subtasks.
+ */
+public class KafkaTopicPartitionAssigner {
+
+	/**
+	 * Returns the index of the target subtask that a specific Kafka partition should be
+	 * assigned to.
+	 *
+	 * <p>The resulting distribution of partitions of a single topic has the following contract:
+	 * <ul>
+	 *     <li>1. Uniformly distributed across subtasks</li>
+	 *     <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
+	 *     subtask indices) by using the partition id as the offset from a starting index
+	 *     (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
+	 *     determined using the topic name).</li>
+	 * </ul>
+	 *
+	 * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
+	 * contract to locally filter out partitions that it should not subscribe to, guaranteeing
+	 * that all partitions of a single topic will always be assigned to some subtask in a
+	 * uniformly distributed manner.
+	 *
+	 * @param partition the Kafka partition
+	 * @param numParallelSubtasks total number of parallel subtasks
+	 *
+	 * @return index of the target subtask that the Kafka partition should be assigned to.
+	 */
+	public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
+		int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
+
+		// here, the assumption is that the id of Kafka partitions are always ascending
+		// starting from 0, and therefore can be used directly as the offset clockwise from the start index
+		return (startIndex + partition.getPartition()) % numParallelSubtasks;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/888fabe9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
index b00d74d..4d3e542 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java
@@ -72,6 +72,11 @@ public class AbstractPartitionDiscovererTest {
 			new KafkaTopicPartition(TEST_TOPIC, 2),
 			new KafkaTopicPartition(TEST_TOPIC, 3));
 
+		int numSubtasks = mockGetAllPartitionsForTopicsReturn.size();
+
+		// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
+		int numConsumers = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0), numSubtasks);
+
 		for (int subtaskIndex = 0; subtaskIndex < mockGetAllPartitionsForTopicsReturn.size(); subtaskIndex++) {
 			TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 					topicsDescriptor,
@@ -85,7 +90,7 @@ public class AbstractPartitionDiscovererTest {
 			assertEquals(1, initialDiscovery.size());
 			assertTrue(contains(mockGetAllPartitionsForTopicsReturn, initialDiscovery.get(0).getPartition()));
 			assertEquals(
-				getExpectedSubtaskIndex(initialDiscovery.get(0), mockGetAllPartitionsForTopicsReturn.size()),
+				getExpectedSubtaskIndex(initialDiscovery.get(0), numConsumers, numSubtasks),
 				subtaskIndex);
 
 			// subsequent discoveries should not find anything
@@ -114,6 +119,9 @@ public class AbstractPartitionDiscovererTest {
 			final int minPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers;
 			final int maxPartitionsPerConsumer = mockGetAllPartitionsForTopicsReturn.size() / numConsumers + 1;
 
+			// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
+			int startIndex = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0), numConsumers);
+
 			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
@@ -130,7 +138,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
-					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
+					assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -163,6 +171,9 @@ public class AbstractPartitionDiscovererTest {
 
 			final int numConsumers = 2 * mockGetAllPartitionsForTopicsReturn.size() + 3;
 
+			// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
+			int startIndex = KafkaTopicPartitionAssigner.assign(mockGetAllPartitionsForTopicsReturn.get(0), numConsumers);
+
 			for (int subtaskIndex = 0; subtaskIndex < numConsumers; subtaskIndex++) {
 				TestPartitionDiscoverer partitionDiscoverer = new TestPartitionDiscoverer(
 						topicsDescriptor,
@@ -178,7 +189,7 @@ public class AbstractPartitionDiscovererTest {
 				for (KafkaTopicPartition p : initialDiscovery) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
-					assertEquals(getExpectedSubtaskIndex(p, numConsumers), subtaskIndex);
+					assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), subtaskIndex);
 				}
 
 				// subsequent discoveries should not find anything
@@ -222,6 +233,9 @@ public class AbstractPartitionDiscovererTest {
 			final int minNewPartitionsPerConsumer = allPartitions.size() / numConsumers;
 			final int maxNewPartitionsPerConsumer = allPartitions.size() / numConsumers + 1;
 
+			// get the start index; the assertions below will fail if the assignment logic does not meet correct contracts
+			int startIndex = KafkaTopicPartitionAssigner.assign(allPartitions.get(0), numConsumers);
+
 			TestPartitionDiscoverer partitionDiscovererSubtask0 = new TestPartitionDiscoverer(
 					topicsDescriptor,
 					0,
@@ -260,19 +274,19 @@ public class AbstractPartitionDiscovererTest {
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -299,32 +313,32 @@ public class AbstractPartitionDiscovererTest {
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : initialDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask0) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 0);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 0);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask1) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 1);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 1);
 			}
 
 			for (KafkaTopicPartition p : secondDiscoverySubtask2) {
 				assertTrue(allNewPartitions.remove(p));
-				assertEquals(getExpectedSubtaskIndex(p, numConsumers), 2);
+				assertEquals(getExpectedSubtaskIndex(p, startIndex, numConsumers), 2);
 			}
 
 			// all partitions must have been assigned
@@ -370,7 +384,7 @@ public class AbstractPartitionDiscovererTest {
 					subtaskIndex,
 					numSubtasks,
 					createMockGetAllTopicsSequenceFromFixedReturn(Arrays.asList("test-topic", "test-topic2")),
-					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturn));
+					createMockGetAllPartitionsFromTopicsSequenceFromFixedReturn(mockGetAllPartitionsForTopicsReturnOutOfOrder));
 			partitionDiscovererOutOfOrder.open();
 
 			List<KafkaTopicPartition> discoveredPartitions = partitionDiscoverer.discoverPartitions();
@@ -487,7 +501,15 @@ public class AbstractPartitionDiscovererTest {
 		return clone;
 	}
 
-	private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int numTasks) {
-		return Math.abs(partition.hashCode() % numTasks);
+	/**
+	 * Utility method that determines the expected subtask index a partition should be assigned to,
+	 * depending on the start index and using the partition id as the offset from that start index
+	 * in clockwise direction.
+	 *
+	 * <p>The expectation is based on the distribution contract of
+	 * {@link KafkaTopicPartitionAssigner#assign(KafkaTopicPartition, int)}.
+	 */
+	private static int getExpectedSubtaskIndex(KafkaTopicPartition partition, int startIndex, int numSubtasks) {
+		return (startIndex + partition.getPartition()) % numSubtasks;
 	}
 }


[5/7] flink git commit: [FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks

Posted by tz...@apache.org.
[FLINK-6998] [kafka] Remove unfruitful null checks on provided KafkaCommitCallbacks

This closes #4187.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ded2d86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ded2d86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ded2d86

Branch: refs/heads/master
Commit: 1ded2d867d9c30f01395494adec3cbaa629bbb5a
Parents: 8828648
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jul 28 18:59:11 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 .../kafka/internal/Kafka010FetcherTest.java     | 17 +++++----
 .../kafka/internals/Kafka08Fetcher.java         | 15 ++++----
 .../kafka/internal/Kafka09Fetcher.java          |  7 +++-
 .../kafka/internal/KafkaConsumerThread.java     | 39 +++++++-------------
 .../kafka/internal/Kafka09FetcherTest.java      | 17 +++++----
 .../kafka/internal/KafkaConsumerThreadTest.java |  7 ++--
 .../kafka/internals/AbstractFetcher.java        |  8 +++-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 12 +++---
 .../kafka/internals/AbstractFetcherTest.java    |  5 ++-
 9 files changed, 66 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index 22e183d..f895e2f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -157,7 +158,7 @@ public class Kafka010FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -285,7 +286,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -302,7 +303,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -338,9 +339,9 @@ public class Kafka010FetcherTest {
 		final byte[] payload = new byte[] {1, 2, 3, 4};
 
 		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+				new ConsumerRecord<>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<>(topic, partition, 17, payload, payload));
 
 		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
 		data.put(new TopicPartition(topic, partition), records);
@@ -446,11 +447,11 @@ public class Kafka010FetcherTest {
 		@Override
 		public void close() {}
 
-		public void waitTillHasBlocker() throws InterruptedException {
+		void waitTillHasBlocker() throws InterruptedException {
 			inBlocking.await();
 		}
 
-		public boolean isStillBlocking() {
+		boolean isStillBlocking() {
 			return lock.isLocked();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 6e2a1d4..7359e91 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -346,21 +348,20 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
+	public void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
 			try {
 				// the ZK handler takes care of incrementing the offsets by 1 before committing
 				zkHandler.prepareAndCommitOffsets(offsets);
-				if (commitCallback != null) {
-					commitCallback.onSuccess();
-				}
+				commitCallback.onSuccess();
 			}
 			catch (Exception e) {
 				if (running) {
-					if (commitCallback != null) {
-						commitCallback.onException(e);
-					}
+					commitCallback.onException(e);
 					throw e;
 				} else {
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index db7d63b..cef70fe 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -210,7 +212,10 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
+	public void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
 		@SuppressWarnings("unchecked")
 		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 6ff82d7..fc5f359 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -97,8 +99,8 @@ public class KafkaConsumerThread extends Thread {
 	private boolean hasAssignedPartitions;
 
 	/**
-	 * Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map)} or {@link #shutdown()})
-	 * had attempted to wakeup the consumer while it was isolated for partition reassignment.
+	 * Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map, KafkaCommitCallback)}
+	 * or {@link #shutdown()}) had attempted to wakeup the consumer while it was isolated for partition reassignment.
 	 */
 	private volatile boolean hasBufferedWakeup;
 
@@ -109,7 +111,8 @@ public class KafkaConsumerThread extends Thread {
 	private volatile boolean commitInProgress;
 
 	/** User callback to be invoked when commits completed. */
-	private volatile KafkaCommitCallback callerCommitCallback;
+	private volatile KafkaCommitCallback offsetCommitCallback;
+
 	public KafkaConsumerThread(
 			Logger log,
 			Handover handover,
@@ -314,30 +317,19 @@ public class KafkaConsumerThread extends Thread {
 	 * superseded by newer ones.
 	 *
 	 * @param offsetsToCommit The offsets to commit
+	 * @param commitCallback callback when Kafka commit completes
 	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
-		setOffsetsToCommit(offsetsToCommit, null);
-	}
+	void setOffsetsToCommit(
+			Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
+			@Nonnull KafkaCommitCallback commitCallback) {
 
-	/**
-	 * Tells this thread to commit a set of offsets. This method does not block, the committing
-	 * operation will happen asynchronously.
-	 *
-	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
-	 * the frequency with which this method is called, then some commits may be skipped due to being
-	 * superseded by newer ones.
-	 *
-	 * @param offsetsToCommit The offsets to commit
-	 * @param commitCallback callback when kafka commit completes
-	 */
-	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
 			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
 					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
 					"This does not compromise Flink's checkpoint integrity.");
 		}
-		this.callerCommitCallback = commitCallback;
+		this.offsetCommitCallback = commitCallback;
 
 		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
 		handover.wakeupProducer();
@@ -501,12 +493,9 @@ public class KafkaConsumerThread extends Thread {
 
 			if (ex != null) {
 				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
-				if (callerCommitCallback != null) {
-					callerCommitCallback.onException(ex);
-				}
-			}
-			else if (callerCommitCallback != null) {
-				callerCommitCallback.onSuccess();
+				offsetCommitCallback.onException(ex);
+			} else {
+				offsetCommitCallback.onSuccess();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index f9ec204..33ec17e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -157,7 +158,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, mock(KafkaCommitCallback.class));
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -284,7 +285,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -301,7 +302,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, mock(KafkaCommitCallback.class));
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
@@ -337,9 +338,9 @@ public class Kafka09FetcherTest {
 		final byte[] payload = new byte[] {1, 2, 3, 4};
 
 		final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList(
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 15, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 16, payload, payload),
-				new ConsumerRecord<byte[], byte[]>(topic, partition, 17, payload, payload));
+				new ConsumerRecord<>(topic, partition, 15, payload, payload),
+				new ConsumerRecord<>(topic, partition, 16, payload, payload),
+				new ConsumerRecord<>(topic, partition, 17, payload, payload));
 
 		final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>();
 		data.put(new TopicPartition(topic, partition), records);
@@ -445,11 +446,11 @@ public class Kafka09FetcherTest {
 		@Override
 		public void close() {}
 
-		public void waitTillHasBlocker() throws InterruptedException {
+		void waitTillHasBlocker() throws InterruptedException {
 			inBlocking.await();
 		}
 
-		public boolean isStillBlocking() {
+		boolean isStillBlocking() {
 			return lock.isLocked();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 3a2f84a..2368091 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -492,7 +493,7 @@ public class KafkaConsumerThreadTest {
 		// pause just before the reassignment so we can inject the wakeup
 		testThread.waitPartitionReassignmentInvoked();
 
-		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
+		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
 		verify(mockConsumer, times(1)).wakeup();
 
 		testThread.startPartitionReassignment();
@@ -578,7 +579,7 @@ public class KafkaConsumerThreadTest {
 		// pause just before the reassignment so we can inject the wakeup
 		testThread.waitPartitionReassignmentInvoked();
 
-		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
+		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
 
 		// make sure the consumer was actually woken up
 		verify(mockConsumer, times(1)).wakeup();
@@ -664,7 +665,7 @@ public class KafkaConsumerThreadTest {
 		// wait until the reassignment has started
 		midAssignmentLatch.await();
 
-		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>());
+		testThread.setOffsetsToCommit(new HashMap<TopicPartition, OffsetAndMetadata>(), mock(KafkaCommitCallback.class));
 
 		// the wakeup in the setOffsetsToCommit() call should have been buffered, and not called on the consumer
 		verify(mockConsumer, never()).wakeup();

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 7314e14..11f97b2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -29,6 +29,8 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -237,10 +239,12 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
-	 * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
+	 * @param commitCallback The callback that the user should trigger when a commit request completes or fails.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
+	public abstract void commitInternalOffsetsToKafka(
+			Map<KafkaTopicPartition, Long> offsets,
+			@Nonnull KafkaCommitCallback commitCallback) throws Exception;
 
 	/**
 	 * Creates the Kafka version specific representation of the given

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 34158fd..59ce666 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -469,7 +469,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -486,10 +486,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		listState = new TestingListState<>();
@@ -504,15 +504,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // no offsets should be committed
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/1ded2d86/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 8699247..1063102 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.HashMap;
@@ -361,7 +362,9 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
+		public void commitInternalOffsetsToKafka(
+				Map<KafkaTopicPartition, Long> offsets,
+				@Nonnull KafkaCommitCallback callback) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}


[4/7] flink git commit: [FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer

Posted by tz...@apache.org.
[FLINK-6998] [kafka] Add Kafka offset commit metrics to Flink Kafka Consumer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8828648b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8828648b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8828648b

Branch: refs/heads/master
Commit: 8828648b4f377a97a9aac42365297479054a5ef0
Parents: c5221c8
Author: Zhenzhong Xu <zx...@netflix.com>
Authored: Mon Jun 26 15:51:09 2017 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 24 ++++++++++++
 .../kafka/internal/Kafka010FetcherTest.java     |  6 +--
 .../kafka/internals/Kafka08Fetcher.java         |  8 +++-
 .../kafka/internal/Kafka09Fetcher.java          |  5 ++-
 .../kafka/internal/KafkaConsumerThread.java     | 27 ++++++++++++-
 .../kafka/internal/Kafka09FetcherTest.java      |  6 +--
 .../kafka/FlinkKafkaConsumerBase.java           | 40 +++++++++++++++++++-
 .../kafka/internals/AbstractFetcher.java        |  3 +-
 .../kafka/internals/KafkaCommitCallback.java    | 39 +++++++++++++++++++
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 13 ++++---
 .../kafka/internals/AbstractFetcherTest.java    |  2 +-
 11 files changed, 154 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 06ed9ef..b8f4acc 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -896,6 +896,30 @@ Thus, in order to infer the metric identifier:
   </tbody>
 </table>
 
+#### Connectors:
+
+##### Kafka Connectors
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Scope</th>
+      <th class="text-left" style="width: 30%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>commitsSucceeded</td>
+      <td>Kafka offset commit success count if Kafka commit is turned on and checkpointing is enabled.</td>
+    </tr>
+    <tr>
+       <th rowspan="1">Operator</th>
+       <td>commitsFailed</td>
+       <td>Kafka offset commit failure count if Kafka commit is turned on and checkpointing is enabled.</td>
+    </tr>
+  </tbody>
+</table>
 
 ### Latency tracking
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
index 2ea1622..22e183d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
@@ -157,7 +157,7 @@ public class Kafka010FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -285,7 +285,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -302,7 +302,7 @@ public class Kafka010FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index aa7649c..6e2a1d4 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -346,15 +346,21 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
 			try {
 				// the ZK handler takes care of incrementing the offsets by 1 before committing
 				zkHandler.prepareAndCommitOffsets(offsets);
+				if (commitCallback != null) {
+					commitCallback.onSuccess();
+				}
 			}
 			catch (Exception e) {
 				if (running) {
+					if (commitCallback != null) {
+						commitCallback.onException(e);
+					}
 					throw e;
 				} else {
 					return;

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 68f54ed..db7d63b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -209,7 +210,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 	}
 
 	@Override
-	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception {
 		@SuppressWarnings("unchecked")
 		List<KafkaTopicPartitionState<TopicPartition>> partitions = subscribedPartitionStates();
 
@@ -228,6 +229,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
 		}
 
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
-		consumerThread.setOffsetsToCommit(offsetsToCommit);
+		consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index de8fb0b..6ff82d7 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
@@ -107,6 +108,8 @@ public class KafkaConsumerThread extends Thread {
 	/** Flag tracking whether the latest commit request has completed. */
 	private volatile boolean commitInProgress;
 
+	/** User callback to be invoked when commits completed. */
+	private volatile KafkaCommitCallback callerCommitCallback;
 	public KafkaConsumerThread(
 			Logger log,
 			Handover handover,
@@ -308,17 +311,33 @@ public class KafkaConsumerThread extends Thread {
 	 *
 	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
 	 * the frequency with which this method is called, then some commits may be skipped due to being
-	 * superseded  by newer ones.
+	 * superseded by newer ones.
 	 *
 	 * @param offsetsToCommit The offsets to commit
 	 */
 	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
+		setOffsetsToCommit(offsetsToCommit, null);
+	}
+
+	/**
+	 * Tells this thread to commit a set of offsets. This method does not block, the committing
+	 * operation will happen asynchronously.
+	 *
+	 * <p>Only one commit operation may be pending at any time. If the committing takes longer than
+	 * the frequency with which this method is called, then some commits may be skipped due to being
+	 * superseded by newer ones.
+	 *
+	 * @param offsetsToCommit The offsets to commit
+	 * @param commitCallback callback when kafka commit completes
+	 */
+	public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, KafkaCommitCallback commitCallback) {
 		// record the work to be committed by the main consumer thread and make sure the consumer notices that
 		if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
 			log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
 					"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
 					"This does not compromise Flink's checkpoint integrity.");
 		}
+		this.callerCommitCallback = commitCallback;
 
 		// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
 		handover.wakeupProducer();
@@ -482,6 +501,12 @@ public class KafkaConsumerThread extends Thread {
 
 			if (ex != null) {
 				log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
+				if (callerCommitCallback != null) {
+					callerCommitCallback.onException(ex);
+				}
+			}
+			else if (callerCommitCallback != null) {
+				callerCommitCallback.onSuccess();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
index 649092b..f9ec204 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java
@@ -157,7 +157,7 @@ public class Kafka09FetcherTest {
 			@Override
 			public void run() {
 				try {
-					fetcher.commitInternalOffsetsToKafka(testCommitData);
+					fetcher.commitInternalOffsetsToKafka(testCommitData, null);
 				} catch (Throwable t) {
 					commitError.set(t);
 				}
@@ -284,7 +284,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the first offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData1);
+		fetcher.commitInternalOffsetsToKafka(testCommitData1, null);
 		Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
@@ -301,7 +301,7 @@ public class Kafka09FetcherTest {
 
 		// ----- trigger the second offset commit -----
 
-		fetcher.commitInternalOffsetsToKafka(testCommitData2);
+		fetcher.commitInternalOffsetsToKafka(testCommitData2, null);
 		Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
 
 		for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index c331bf0..f3c9e5e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -42,6 +43,7 @@ import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
@@ -186,6 +188,24 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
+	//  internal metrics
+	// ------------------------------------------------------------------------
+
+	/** Counter for successful Kafka offset commits. */
+	private transient Counter successfulCommits;
+
+	/** Counter for failed Kafka offset commits. */
+	private transient Counter failedCommits;
+
+	/** Callback interface that will be invoked upon async Kafka commit completion.
+	 *  Please be aware that default callback implementation in base class does not
+	 *  provide any guarantees on thread-safety. This is sufficient for now because current
+	 *  supported Kafka connectors guarantee no more than 1 concurrent async pending offset
+	 *  commit.
+	 */
+	private transient KafkaCommitCallback offsetCommitCallback;
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Base constructor.
@@ -504,6 +524,23 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			throw new Exception("The partitions were not set for the consumer");
 		}
 
+		// initialize commit metrics and default offset callback method
+		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter("commitsSucceeded");
+		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter("commitsFailed");
+
+		this.offsetCommitCallback = new KafkaCommitCallback() {
+			@Override
+			public void onSuccess() {
+				successfulCommits.inc();
+			}
+
+			@Override
+			public void onException(Throwable cause) {
+				LOG.error("Async Kafka commit failed.", cause);
+				failedCommits.inc();
+			}
+		};
+
 		// mark the subtask as temporarily idle if there are no initial seed partitions;
 		// once this subtask discovers some partitions and starts collecting records, the subtask's
 		// status will automatically be triggered back to be active.
@@ -784,7 +821,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					LOG.debug("Checkpoint state was empty.");
 					return;
 				}
-				fetcher.commitInternalOffsetsToKafka(offsets);
+
+				fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
 			} catch (Exception e) {
 				if (running) {
 					throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 3bed0b8..7314e14 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -237,9 +237,10 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * committing them, so that committed offsets to Kafka represent "the next record to process".
 	 *
 	 * @param offsets The offsets to commit to Kafka (implementations must increment offsets by 1 before committing).
+	 * @param commitCallback The callback that the user can implement to trigger custom actions when a commit request completes.
 	 * @throws Exception This method forwards exceptions.
 	 */
-	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception;
+	public abstract void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback commitCallback) throws Exception;
 
 	/**
 	 * Creates the Kafka version specific representation of the given

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
new file mode 100644
index 0000000..aca7ae5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaCommitCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+/**
+ * A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
+ * which should normally be triggered from checkpoint complete event.
+ */
+public interface KafkaCommitCallback {
+
+	/**
+	 * A callback method the user can implement to provide asynchronous handling of commit request completion.
+	 * This method will be called when the commit request sent to the server has been acknowledged without error.
+	 */
+	void onSuccess();
+
+	/**
+	 * A callback method the user can implement to provide asynchronous handling of commit request failure.
+	 * This method will be called when the commit request failed.
+	 * @param cause Kafka commit failure cause returned by kafka client
+	 */
+	void onException(Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index ad08c4d..34158fd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
@@ -468,7 +469,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// checkpoint 3
 		consumer.snapshotState(new StateSnapshotContextSynchronousImpl(141, 141));
@@ -485,10 +486,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		listState = new TestingListState<>();
@@ -503,15 +504,15 @@ public class FlinkKafkaConsumerBaseTest {
 
 		// commit only the second last
 		consumer.notifyCheckpointComplete(598);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// access invalid checkpoint
 		consumer.notifyCheckpointComplete(590);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 
 		// and the last
 		consumer.notifyCheckpointComplete(599);
-		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
+		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap(), Matchers.any(KafkaCommitCallback.class)); // not offsets should be committed
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8828648b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index 768df0f..8699247 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -361,7 +361,7 @@ public class AbstractFetcherTest {
 		}
 
 		@Override
-		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
+		public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, KafkaCommitCallback callback) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}


[6/7] flink git commit: [FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

Posted by tz...@apache.org.
[FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

This closes #4414.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0d4a772
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0d4a772
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0d4a772

Branch: refs/heads/master
Commit: f0d4a772f44e95b52d58284b85074cb04d2796ab
Parents: 1ded2d8
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jul 27 18:31:13 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java    | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0d4a772/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index dac45f7..fda6832 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -217,7 +217,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -243,8 +243,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -303,7 +304,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -328,8 +329,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -1607,8 +1609,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 					env1.execute("Metrics test job");
 				} catch (Throwable t) {
-					LOG.warn("Got exception during execution", t);
 					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
 				}
@@ -1658,11 +1660,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		} finally {
 			// cancel
 			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			// wait for the job to finish (it should due to the cancel command above)
+			jobThread.join();
 		}
 
-		while (jobThread.isAlive()) {
-			Thread.sleep(50);
-		}
 		if (error.f0 != null) {
 			throw error.f0;
 		}