You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/15 14:37:03 UTC

flink git commit: [FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer

Repository: flink
Updated Branches:
  refs/heads/release-1.1 a34559d20 -> e296acae5


[FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This commit fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.

This closes #3507.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e296acae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e296acae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e296acae

Branch: refs/heads/release-1.1
Commit: e296acae5269d3da7af87514804c06b865e40d3e
Parents: a34559d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Mar 10 14:47:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Mar 15 22:35:22 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 44 ++++++----
 .../kafka/internals/AbstractFetcher.java        | 33 ++++---
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 92 ++++++++++++++++++--
 .../KafkaConsumerPartitionAssignmentTest.java   | 51 ++++++++---
 .../kafka/KafkaShortRetentionTestBase.java      |  1 -
 5 files changed, 175 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 2b2c527..bab7639 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -210,8 +210,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		}
 		
 		// figure out which partitions this subtask should process
-		final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(allSubscribedPartitions,
-				getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
+		final List<KafkaTopicPartition> thisSubtaskPartitions = assignPartitions(
+				restoreToOffset,
+				allSubscribedPartitions,
+				getRuntimeContext().getNumberOfParallelSubtasks(),
+				getRuntimeContext().getIndexOfThisSubtask());
 		
 		// we need only do work, if we actually have partitions assigned
 		if (!thisSubtaskPartitions.isEmpty()) {
@@ -416,29 +419,38 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Selects which of the given partitions should be handled by a specific consumer,
-	 * given a certain number of consumers.
-	 * 
-	 * @param allPartitions The partitions to select from
+	 * Determines which partitions the consumer should subscribe to.
+	 *
+	 * If we have restored offsets, we simply use that as the subscribed partitions for the subtask.
+	 * Otherwise, we select from the given complete list of partitions, given a certain number of consumer subtasks.
+	 *
+	 * @param restoredPartitionOffsets The restored partition offsets, if any
+	 * @param completeKafkaPartitionsList The complete list of kafka partitions
 	 * @param numConsumers The number of consumers
 	 * @param consumerIndex The index of the specific consumer
 	 * 
 	 * @return The sublist of partitions to be handled by that consumer.
 	 */
 	protected static List<KafkaTopicPartition> assignPartitions(
-			List<KafkaTopicPartition> allPartitions,
-			int numConsumers, int consumerIndex)
+			Map<KafkaTopicPartition, Long> restoredPartitionOffsets,
+			List<KafkaTopicPartition> completeKafkaPartitionsList,
+			int numConsumers,
+			int consumerIndex)
 	{
-		final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>(
-				allPartitions.size() / numConsumers + 1);
-
-		for (int i = 0; i < allPartitions.size(); i++) {
-			if (i % numConsumers == consumerIndex) {
-				thisSubtaskPartitions.add(allPartitions.get(i));
+		if (restoredPartitionOffsets != null) {
+			return new ArrayList<>(restoredPartitionOffsets.keySet());
+		} else {
+			final List<KafkaTopicPartition> thisSubtaskPartitions =
+				new ArrayList<>(completeKafkaPartitionsList.size() / numConsumers + 1);
+
+			for (int i = 0; i < completeKafkaPartitionsList.size(); i++) {
+				if (i % numConsumers == consumerIndex) {
+					thisSubtaskPartitions.add(completeKafkaPartitionsList.get(i));
+				}
 			}
+
+			return thisSubtaskPartitions;
 		}
-		
-		return thisSubtaskPartitions;
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 8ec26cc..979dccc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -181,23 +181,36 @@ public abstract class AbstractFetcher<T, KPH> {
 
 		HashMap<KafkaTopicPartition, Long> state = new HashMap<>(allPartitions.length);
 		for (KafkaTopicPartitionState<?> partition : subscribedPartitions()) {
-			if (partition.isOffsetDefined()) {
-				state.put(partition.getKafkaTopicPartition(), partition.getOffset());
-			}
+			state.put(partition.getKafkaTopicPartition(), partition.getOffset());
 		}
 		return state;
 	}
 
 	/**
 	 * Restores the partition offsets.
-	 * 
-	 * @param snapshotState The offsets for the partitions 
+	 * The partitions in the provided map of restored partitions to offsets must completely match
+	 * the fetcher's subscribed partitions.
+	 *
+	 * @param restoredOffsets The restored offsets for the partitions
+	 *
+	 * @throws IllegalStateException if the partitions in the provided restored offsets map
+	 * cannot completely match the fetcher's subscribed partitions.
 	 */
-	public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
-		for (KafkaTopicPartitionState<?> partition : allPartitions) {
-			Long offset = snapshotState.get(partition.getKafkaTopicPartition());
-			if (offset != null) {
-				partition.setOffset(offset);
+	public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) {
+		if (restoredOffsets.size() != allPartitions.length) {
+			throw new IllegalStateException(
+				"The fetcher was restored with partition offsets that do not " +
+					"match with the subscribed partitions: " + restoredOffsets);
+		} else {
+			for (KafkaTopicPartitionState<?> partition : allPartitions) {
+				Long offset = restoredOffsets.get(partition.getKafkaTopicPartition());
+				if (offset != null) {
+					partition.setOffset(offset);
+				} else {
+					throw new IllegalStateException(
+						"The fetcher was restored with partition offsets that do not " +
+							"contain offsets for subscribed partition " + partition.getKafkaTopicPartition());
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 9b517df..3926513 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,8 +19,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -30,6 +33,7 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
@@ -44,12 +48,14 @@ public class FlinkKafkaConsumerBaseTest {
 	@Test
 	public void testEitherWatermarkExtractor() {
 		try {
-			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
+			new DummyFlinkKafkaConsumer<>(null, null, 1, 0)
+				.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
 
 		try {
-			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
+			new DummyFlinkKafkaConsumer<>(null, null, 1, 0)
+				.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
 		
@@ -58,14 +64,14 @@ public class FlinkKafkaConsumerBaseTest {
 		@SuppressWarnings("unchecked")
 		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 		
-		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
+		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>(null, null, 1, 0);
 		c1.assignTimestampsAndWatermarks(periodicAssigner);
 		try {
 			c1.assignTimestampsAndWatermarks(punctuatedAssigner);
 			fail();
 		} catch (IllegalStateException ignored) {}
 
-		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
+		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>(null, null, 1, 0);
 		c2.assignTimestampsAndWatermarks(punctuatedAssigner);
 		try {
 			c2.assignTimestampsAndWatermarks(periodicAssigner);
@@ -109,6 +115,45 @@ public class FlinkKafkaConsumerBaseTest {
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
 		assertNull(consumer.snapshotState(17L, 23L));
 	}
+
+	/**
+	 * Tests that the fetcher is restored with all partitions in the restored state,
+	 * regardless of the queried complete list of Kafka partitions.
+	 */
+	@Test
+	public void testStateIntactOnRestore() throws Exception {
+		@SuppressWarnings("unchecked")
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		HashMap<KafkaTopicPartition, Long> restoreState = new HashMap<>();
+		restoreState.put(new KafkaTopicPartition("test-topic", 0), 23L);
+		restoreState.put(new KafkaTopicPartition("test-topic", 2), 42L);
+
+		List<KafkaTopicPartition> mockCompletePartitions = new ArrayList<>(6);
+		mockCompletePartitions.add(new KafkaTopicPartition("test-topic", 0));
+		mockCompletePartitions.add(new KafkaTopicPartition("test-topic", 1));
+		mockCompletePartitions.add(new KafkaTopicPartition("test-topic", 2));
+		mockCompletePartitions.add(new KafkaTopicPartition("test-topic", 3));
+		mockCompletePartitions.add(new KafkaTopicPartition("test-topic", 4));
+
+		List<KafkaTopicPartition> expectedFetcherSubscribedPartitions = new ArrayList<>(3);
+		expectedFetcherSubscribedPartitions.add(new KafkaTopicPartition("test-topic", 0));
+		expectedFetcherSubscribedPartitions.add(new KafkaTopicPartition("test-topic", 2));
+
+		FlinkKafkaConsumerBase<String> consumer = new DummyFlinkKafkaConsumer<>(
+			fetcher,
+			expectedFetcherSubscribedPartitions,
+			2,
+			0);
+
+		consumer.setSubscribedPartitions(mockCompletePartitions);
+		consumer.restoreState(restoreState);
+		consumer.open(new Configuration());
+
+		consumer.run(mock(SourceFunction.SourceContext.class));
+
+		verify(fetcher).restoreOffsets(restoreState);
+	}
 	
 	@Test
 	@SuppressWarnings("unchecked")
@@ -187,7 +232,7 @@ public class FlinkKafkaConsumerBaseTest {
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
 			AbstractFetcher<T, ?> fetcher, LinkedMap pendingCheckpoints, boolean running) throws Exception
 	{
-		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>(fetcher, null, 1, 0);
 
 		Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
 		fetcherField.setAccessible(true);
@@ -209,14 +254,45 @@ public class FlinkKafkaConsumerBaseTest {
 	private static final class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
 		private static final long serialVersionUID = 1L;
 
+		private final AbstractFetcher<T, ?> mockFetcher;
+		private final List<KafkaTopicPartition> expectedThisSubtaskPartitions;
+		private final int numConsumerSubtasks;
+		private final int thisConsumerSubtaskIndex;
+
 		@SuppressWarnings("unchecked")
-		public DummyFlinkKafkaConsumer() {
+		public DummyFlinkKafkaConsumer(
+				AbstractFetcher<T, ?> mockFetcher,
+				List<KafkaTopicPartition> expectedThisSubtaskPartitions,
+				int numConsumerSubtasks,
+				int thisConsumerSubtaskIndex) {
 			super((KeyedDeserializationSchema<T>) mock(KeyedDeserializationSchema.class));
+			this.mockFetcher = mockFetcher;
+			this.expectedThisSubtaskPartitions = expectedThisSubtaskPartitions;
+			this.numConsumerSubtasks = numConsumerSubtasks;
+			this.thisConsumerSubtaskIndex = thisConsumerSubtaskIndex;
+		}
+
+		@Override
+		protected AbstractFetcher<T, ?> createFetcher(
+				SourceContext<T> sourceContext,
+				List<KafkaTopicPartition> thisSubtaskPartitions,
+				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+				StreamingRuntimeContext runtimeContext) throws Exception {
+
+			assertEquals(expectedThisSubtaskPartitions.size(), thisSubtaskPartitions.size());
+			for (KafkaTopicPartition expectedPartition : expectedThisSubtaskPartitions) {
+				thisSubtaskPartitions.contains(expectedPartition);
+			}
+			return mockFetcher;
 		}
 
 		@Override
-		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
-			return null;
+		public RuntimeContext getRuntimeContext() {
+			RuntimeContext context = mock(StreamingRuntimeContext.class);
+			when(context.getNumberOfParallelSubtasks()).thenReturn(numConsumerSubtasks);
+			when(context.getIndexOfThisSubtask()).thenReturn(thisConsumerSubtaskIndex);
+			return context;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 9beed22..d580a37 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -24,8 +24,10 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.*;
@@ -46,7 +48,7 @@ public class KafkaConsumerPartitionAssignmentTest {
 
 			for (int i = 0; i < inPartitions.size(); i++) {
 				List<KafkaTopicPartition> parts = 
-						FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
+						FlinkKafkaConsumerBase.assignPartitions(null, inPartitions, inPartitions.size(), i);
 
 				assertNotNull(parts);
 				assertEquals(1, parts.size());
@@ -88,7 +90,7 @@ public class KafkaConsumerPartitionAssignmentTest {
 
 			for (int i = 0; i < numConsumers; i++) {
 				List<KafkaTopicPartition> parts = 
-						FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+						FlinkKafkaConsumerBase.assignPartitions(null, partitions, numConsumers, i);
 
 				assertNotNull(parts);
 				assertTrue(parts.size() >= minPartitionsPerConsumer);
@@ -124,7 +126,7 @@ public class KafkaConsumerPartitionAssignmentTest {
 			final int numConsumers = 2 * inPartitions.size() + 3;
 
 			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+				List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(null, inPartitions, numConsumers, i);
 
 				assertNotNull(parts);
 				assertTrue(parts.size() <= 1);
@@ -148,11 +150,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 	public void testAssignEmptyPartitions() {
 		try {
 			List<KafkaTopicPartition> ep = new ArrayList<>();
-			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(null, ep, 4, 2);
 			assertNotNull(parts1);
 			assertTrue(parts1.isEmpty());
 
-			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(null, ep, 1, 0);
 			assertNotNull(parts2);
 			assertTrue(parts2.isEmpty());
 		}
@@ -163,6 +165,33 @@ public class KafkaConsumerPartitionAssignmentTest {
 	}
 
 	@Test
+	public void testAssignOnRestoreOffsets() {
+		Map<KafkaTopicPartition, Long> restoredOffsets = new HashMap<>();
+		restoredOffsets.put(new KafkaTopicPartition("test-topic", 0), 23L);
+		restoredOffsets.put(new KafkaTopicPartition("test-topic", 2), 42L);
+
+		List<KafkaTopicPartition> completePartitionsList = Arrays.asList(
+			new KafkaTopicPartition("test-topic", 0),
+			new KafkaTopicPartition("test-topic", 1),
+			new KafkaTopicPartition("test-topic", 2),
+			new KafkaTopicPartition("test-topic", 3),
+			new KafkaTopicPartition("test-topic", 4));
+
+		List<KafkaTopicPartition> expectedAssignedPartitions = new ArrayList<>(2);
+		expectedAssignedPartitions.add(new KafkaTopicPartition("test-topic", 0));
+		expectedAssignedPartitions.add(new KafkaTopicPartition("test-topic", 2));
+
+		List<KafkaTopicPartition> assignedPartitions = FlinkKafkaConsumerBase.assignPartitions(restoredOffsets, completePartitionsList, 1, 0);
+
+		// regardless of the complete partitions list, number of consumers, and consumer index, we should be
+		// assigned all partitions in the restored offsets
+		assertEquals(expectedAssignedPartitions.size(), assignedPartitions.size());
+		for (KafkaTopicPartition expectedPartition : expectedAssignedPartitions) {
+			assignedPartitions.contains(expectedPartition);
+		}
+	}
+
+	@Test
 	public void testGrowingPartitionsRemainsStable() {
 		try {
 			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
@@ -185,11 +214,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
 
 			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(
-					initialPartitions, numConsumers, 0);
+					null, initialPartitions, numConsumers, 0);
 			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(
-					initialPartitions, numConsumers, 1);
+					null, initialPartitions, numConsumers, 1);
 			List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions(
-					initialPartitions, numConsumers, 2);
+					null, initialPartitions, numConsumers, 2);
 
 			assertNotNull(parts1);
 			assertNotNull(parts2);
@@ -221,11 +250,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 			// grow the set of partitions and distribute anew
 
 			List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions(
-					newPartitions, numConsumers, 0);
+					null, newPartitions, numConsumers, 0);
 			List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions(
-					newPartitions, numConsumers, 1);
+					null, newPartitions, numConsumers, 1);
 			List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions(
-					newPartitions, numConsumers, 2);
+					null, newPartitions, numConsumers, 2);
 
 			// new partitions must include all old partitions
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9e3c33b..d24579c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -233,7 +233,6 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		try {
 			env.execute("Test auto offset reset none");
 		} catch(Throwable e) {
-			System.out.println("MESSAGE: " + e.getCause().getCause().getMessage());
 			// check if correct exception has been thrown
 			if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset")  // kafka 0.8
 			 && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9