You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/13 10:31:06 UTC

[08/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
new file mode 100644
index 0000000..99c5d69
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
+ */
+public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+	
+	/** The timestamp assigner and watermark generator for the partition */
+	private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
+	
+	/** The last watermark timestamp generated by this partition */
+	private long partitionWatermark;
+
+	// ------------------------------------------------------------------------
+	
+	public KafkaTopicPartitionStateWithPeriodicWatermarks(
+			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+			AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
+	{
+		super(partition, kafkaPartitionHandle);
+		
+		this.timestampsAndWatermarks = timestampsAndWatermarks;
+		this.partitionWatermark = Long.MIN_VALUE;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public long getTimestampForRecord (T record) {
+		return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE);
+	}
+	
+	public long getCurrentWatermarkTimestamp() {
+		Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
+		if (wm != null) {
+			partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
+		}
+		return partitionWatermark;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
+				+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
new file mode 100644
index 0000000..b265990
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+/**
+ * A special version of the per-kafka-partition-state that additionally holds
+ * a periodic watermark generator (and timestamp extractor) per partition.
+ * 
+ * <p>This class is not thread safe, but it gives volatile access to the current
+ * partition watermark ({@link #getCurrentPartitionWatermark()}).
+ * 
+ * @param <T> The type of records handled by the watermark generator
+ * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
+ */
+public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
+	
+	/** The timestamp assigner and watermark generator for the partition */
+	private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
+	
+	/** The last watermark timestamp generated by this partition */
+	private volatile long partitionWatermark;
+
+	// ------------------------------------------------------------------------
+	
+	public KafkaTopicPartitionStateWithPunctuatedWatermarks(
+			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
+			AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
+	{
+		super(partition, kafkaPartitionHandle);
+		
+		this.timestampsAndWatermarks = timestampsAndWatermarks;
+		this.partitionWatermark = Long.MIN_VALUE;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public long getTimestampForRecord(T record) {
+		return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE);
+	}
+
+	@Nullable
+	public Watermark checkAndGetNewWatermark(T record, long timestamp) {
+		Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp);
+		if (mark != null && mark.getTimestamp() > partitionWatermark) {
+			partitionWatermark = mark.getTimestamp();
+			return mark;
+		}
+		else {
+			return null;
+		}
+	}
+	
+	public long getCurrentPartitionWatermark() {
+		return partitionWatermark;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
+				+ ", offset=" + getOffset() + ", watermark=" + partitionWatermark;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 038f414..37e2ef6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index 1be6b00..bda90bd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kafka.util;
 
+package org.apache.flink.streaming.connectors.kafka.util;
 
 import java.util.Properties;
 
@@ -38,10 +38,9 @@ public class KafkaUtils {
 					"Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'");
 		}
 	}
-
-	public static void checkArgument(boolean arg) {
-		if(!arg) {
-			throw new IllegalArgumentException();
-		}
-	}
+	
+	// ------------------------------------------------------------------------
+	
+	/** Private default constructor to prevent instantiation */
+	private KafkaUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
new file mode 100644
index 0000000..f4ef995
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class FlinkKafkaConsumerBaseTest {
+
+	/**
+	 * Tests that not both types of timestamp extractors / watermark generators can be used.
+	 */
+	@Test
+	public void testEitherWatermarkExtractor() {
+		try {
+			new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null);
+			fail();
+		} catch (NullPointerException ignored) {}
+
+		try {
+			new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null);
+			fail();
+		} catch (NullPointerException ignored) {}
+		
+		@SuppressWarnings("unchecked")
+		final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
+		@SuppressWarnings("unchecked")
+		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
+		
+		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
+		c1.setPeriodicWatermarkEmitter(periodicAssigner);
+		try {
+			c1.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+			fail();
+		} catch (IllegalStateException ignored) {}
+
+		DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>();
+		c2.setPunctuatedWatermarkEmitter(punctuatedAssigner);
+		try {
+			c2.setPeriodicWatermarkEmitter(periodicAssigner);
+			fail();
+		} catch (IllegalStateException ignored) {}
+	}
+
+	/**
+	 * Tests that no checkpoints happen when the fetcher is not running.
+	 */
+	@Test
+	public void ignoreCheckpointWhenNotRunning() throws Exception {
+		@SuppressWarnings("unchecked")
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
+		assertNull(consumer.snapshotState(17L, 23L));
+		consumer.notifyCheckpointComplete(66L);
+	}
+
+	/**
+	 * Tests that no checkpoints happen when the fetcher is not running.
+	 */
+	@Test
+	public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception {
+		HashMap<KafkaTopicPartition, Long> restoreState = new HashMap<>();
+		restoreState.put(new KafkaTopicPartition("abc", 13), 16768L);
+		restoreState.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+		consumer.restoreState(restoreState);
+		
+		assertEquals(restoreState, consumer.snapshotState(17L, 23L));
+	}
+
+	/**
+	 * Tests that no checkpoints happen when the fetcher is not running.
+	 */
+	@Test
+	public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception {
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
+		assertNull(consumer.snapshotState(17L, 23L));
+	}
+	
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSnapshotState() throws Exception {
+		final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>();
+		state1.put(new KafkaTopicPartition("abc", 13), 16768L);
+		state1.put(new KafkaTopicPartition("def", 7), 987654321L);
+
+		final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>();
+		state2.put(new KafkaTopicPartition("abc", 13), 16770L);
+		state2.put(new KafkaTopicPartition("def", 7), 987654329L);
+
+		final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>();
+		state2.put(new KafkaTopicPartition("abc", 13), 16780L);
+		state2.put(new KafkaTopicPartition("def", 7), 987654377L);
+		
+		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
+		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
+			
+		final LinkedMap pendingCheckpoints = new LinkedMap();
+	
+		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true);
+		assertEquals(0, pendingCheckpoints.size());
+		
+		// checkpoint 1
+		HashMap<KafkaTopicPartition, Long> snapshot1 = consumer.snapshotState(138L, 19L);
+		assertEquals(state1, snapshot1);
+		assertEquals(1, pendingCheckpoints.size());
+		assertEquals(state1, pendingCheckpoints.get(138L));
+
+		// checkpoint 2
+		HashMap<KafkaTopicPartition, Long> snapshot2 = consumer.snapshotState(140L, 1578L);
+		assertEquals(state2, snapshot2);
+		assertEquals(2, pendingCheckpoints.size());
+		assertEquals(state2, pendingCheckpoints.get(140L));
+		
+		// ack checkpoint 1
+		consumer.notifyCheckpointComplete(138L);
+		assertEquals(1, pendingCheckpoints.size());
+		assertTrue(pendingCheckpoints.containsKey(140L));
+
+		// checkpoint 3
+		HashMap<KafkaTopicPartition, Long> snapshot3 = consumer.snapshotState(141L, 1578L);
+		assertEquals(state3, snapshot3);
+		assertEquals(2, pendingCheckpoints.size());
+		assertEquals(state3, pendingCheckpoints.get(141L));
+		
+		// ack checkpoint 3, subsumes number 2
+		consumer.notifyCheckpointComplete(141L);
+		assertEquals(0, pendingCheckpoints.size());
+
+
+		consumer.notifyCheckpointComplete(666); // invalid checkpoint
+		assertEquals(0, pendingCheckpoints.size());
+
+		// create 500 snapshots
+		for (int i = 100; i < 600; i++) {
+			consumer.snapshotState(i, 15 * i);
+		}
+		assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+
+		// commit only the second last
+		consumer.notifyCheckpointComplete(598);
+		assertEquals(1, pendingCheckpoints.size());
+
+		// access invalid checkpoint
+		consumer.notifyCheckpointComplete(590);
+
+		// and the last
+		consumer.notifyCheckpointComplete(599);
+		assertEquals(0, pendingCheckpoints.size());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
+			AbstractFetcher<T, ?> fetcher, LinkedMap pendingCheckpoints, boolean running) throws Exception
+	{
+		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
+
+		Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
+		fetcherField.setAccessible(true);
+		fetcherField.set(consumer, fetcher);
+
+		Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
+		mapField.setAccessible(true);
+		mapField.set(consumer, pendingCheckpoints);
+
+		Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running");
+		runningField.setAccessible(true);
+		runningField.set(consumer, running);
+
+		return consumer;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+		private static final long serialVersionUID = 1L;
+
+		@SuppressWarnings("unchecked")
+		public DummyFlinkKafkaConsumer() {
+			super((KeyedDeserializationSchema<T>) mock(KeyedDeserializationSchema.class));
+		}
+
+		@Override
+		protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index e86d51a..9beed22 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.kafka.common.Node;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -32,30 +30,27 @@ import java.util.Set;
 
 import static org.junit.Assert.*;
 
-
 /**
  * Tests that the partition assignment is deterministic and stable.
  */
 public class KafkaConsumerPartitionAssignmentTest {
 
-	private final Node fake = new Node(1337, "localhost", 1337);
-
 	@Test
 	public void testPartitionsEqualConsumers() {
 		try {
-			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
+			List<KafkaTopicPartition> inPartitions = Arrays.asList(
+					new KafkaTopicPartition("test-topic", 4),
+					new KafkaTopicPartition("test-topic", 52),
+					new KafkaTopicPartition("test-topic", 17),
+					new KafkaTopicPartition("test-topic", 1));
 
 			for (int i = 0; i < inPartitions.size(); i++) {
-				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(
-						inPartitions, inPartitions.size(), i);
+				List<KafkaTopicPartition> parts = 
+						FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
 
 				assertNotNull(parts);
 				assertEquals(1, parts.size());
-				assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition()));
+				assertTrue(contains(inPartitions, parts.get(0).getPartition()));
 			}
 		}
 		catch (Exception e) {
@@ -64,9 +59,9 @@ public class KafkaConsumerPartitionAssignmentTest {
 		}
 	}
 
-	private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) {
-		for (KafkaTopicPartitionLeader ktp: inPartitions) {
-			if (ktp.getTopicPartition().getPartition() == partition) {
+	private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) {
+		for (KafkaTopicPartition ktp : inPartitions) {
+			if (ktp.getPartition() == partition) {
 				return true;
 			}
 		}
@@ -78,11 +73,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 		try {
 			final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
 
-			final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
-			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+			final List<KafkaTopicPartition> partitions = new ArrayList<>();
+			final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
 
 			for (int p : partitionIDs) {
-				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+				KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
 				partitions.add(part);
 				allPartitions.add(part);
 			}
@@ -92,13 +87,14 @@ public class KafkaConsumerPartitionAssignmentTest {
 			final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
 
 			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+				List<KafkaTopicPartition> parts = 
+						FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
 
 				assertNotNull(parts);
 				assertTrue(parts.size() >= minPartitionsPerConsumer);
 				assertTrue(parts.size() <= maxPartitionsPerConsumer);
 
-				for (KafkaTopicPartitionLeader p : parts) {
+				for (KafkaTopicPartition p : parts) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
 				}
@@ -116,24 +112,24 @@ public class KafkaConsumerPartitionAssignmentTest {
 	@Test
 	public void testPartitionsFewerThanConsumers() {
 		try {
-			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
+			List<KafkaTopicPartition> inPartitions = Arrays.asList(
+					new KafkaTopicPartition("test-topic", 4),
+					new KafkaTopicPartition("test-topic", 52),
+					new KafkaTopicPartition("test-topic", 17),
+					new KafkaTopicPartition("test-topic", 1));
 
-			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+			final Set<KafkaTopicPartition> allPartitions = new HashSet<>();
 			allPartitions.addAll(inPartitions);
 
 			final int numConsumers = 2 * inPartitions.size() + 3;
 
 			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+				List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
 
 				assertNotNull(parts);
 				assertTrue(parts.size() <= 1);
 
-				for (KafkaTopicPartitionLeader p : parts) {
+				for (KafkaTopicPartition p : parts) {
 					// check that the element was actually contained
 					assertTrue(allPartitions.remove(p));
 				}
@@ -151,12 +147,12 @@ public class KafkaConsumerPartitionAssignmentTest {
 	@Test
 	public void testAssignEmptyPartitions() {
 		try {
-			List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
-			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+			List<KafkaTopicPartition> ep = new ArrayList<>();
+			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
 			assertNotNull(parts1);
 			assertTrue(parts1.isEmpty());
 
-			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
 			assertNotNull(parts2);
 			assertTrue(parts2.isEmpty());
 		}
@@ -170,17 +166,17 @@ public class KafkaConsumerPartitionAssignmentTest {
 	public void testGrowingPartitionsRemainsStable() {
 		try {
 			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>();
+			List<KafkaTopicPartition> newPartitions = new ArrayList<>();
 
 			for (int p : newPartitionIDs) {
-				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+				KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p);
 				newPartitions.add(part);
 			}
 
-			List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7);
+			List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7);
 
-			final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions);
-			final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions);
+			final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions);
+			final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions);
 
 			final int numConsumers = 3;
 			final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
@@ -188,11 +184,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 			final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
 			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
 
-			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(
+			List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(
 					initialPartitions, numConsumers, 0);
-			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(
+			List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(
 					initialPartitions, numConsumers, 1);
-			List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumerBase.assignPartitions(
+			List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions(
 					initialPartitions, numConsumers, 2);
 
 			assertNotNull(parts1);
@@ -206,15 +202,15 @@ public class KafkaConsumerPartitionAssignmentTest {
 			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
 			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
 
-			for (KafkaTopicPartitionLeader p : parts1) {
+			for (KafkaTopicPartition p : parts1) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
 			}
-			for (KafkaTopicPartitionLeader p : parts2) {
+			for (KafkaTopicPartition p : parts2) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
 			}
-			for (KafkaTopicPartitionLeader p : parts3) {
+			for (KafkaTopicPartition p : parts3) {
 				// check that the element was actually contained
 				assertTrue(allInitialPartitions.remove(p));
 			}
@@ -224,11 +220,11 @@ public class KafkaConsumerPartitionAssignmentTest {
 
 			// grow the set of partitions and distribute anew
 
-			List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumerBase.assignPartitions(
+			List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions(
 					newPartitions, numConsumers, 0);
-			List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumerBase.assignPartitions(
+			List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions(
 					newPartitions, numConsumers, 1);
-			List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumerBase.assignPartitions(
+			List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions(
 					newPartitions, numConsumers, 2);
 
 			// new partitions must include all old partitions
@@ -248,15 +244,15 @@ public class KafkaConsumerPartitionAssignmentTest {
 			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
 			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
 
-			for (KafkaTopicPartitionLeader p : parts1new) {
+			for (KafkaTopicPartition p : parts1new) {
 				// check that the element was actually contained
 				assertTrue(allNewPartitions.remove(p));
 			}
-			for (KafkaTopicPartitionLeader p : parts2new) {
+			for (KafkaTopicPartition p : parts2new) {
 				// check that the element was actually contained
 				assertTrue(allNewPartitions.remove(p));
 			}
-			for (KafkaTopicPartitionLeader p : parts3new) {
+			for (KafkaTopicPartition p : parts3new) {
 				// check that the element was actually contained
 				assertTrue(allNewPartitions.remove(p));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 340950b..aa5344b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -26,8 +26,6 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import kafka.server.KafkaServer;
 
-import org.apache.commons.collections.map.LinkedMap;
-
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -66,12 +64,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
@@ -98,7 +94,6 @@ import org.junit.Rule;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -176,70 +171,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}
 	}
-	/**
-	 * Test that validates that checkpointing and checkpoint notification works properly
-	 */
-	public void runCheckpointingTest() throws Exception {
-		createTestTopic("testCheckpointing", 1, 1);
-
-		FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps);
-		Field pendingCheckpointsField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints");
-		pendingCheckpointsField.setAccessible(true);
-		LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
-		Assert.assertEquals(0, pendingCheckpoints.size());
-		source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
-		final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>();
-		initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L);
-
-		// first restore
-		source.restoreState(initialOffsets);
-
-		// then open
-		source.open(new Configuration());
-		HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15);
-
-		assertEquals(initialOffsets, state1);
-
-		HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30);
-		Assert.assertEquals(initialOffsets, state2);
-
-		Assert.assertEquals(2, pendingCheckpoints.size());
-
-		source.notifyCheckpointComplete(1);
-		Assert.assertEquals(1, pendingCheckpoints.size());
-
-		source.notifyCheckpointComplete(2);
-		Assert.assertEquals(0, pendingCheckpoints.size());
-
-		source.notifyCheckpointComplete(666); // invalid checkpoint
-		Assert.assertEquals(0, pendingCheckpoints.size());
-
-		// create 500 snapshots
-		for (int i = 100; i < 600; i++) {
-			source.snapshotState(i, 15 * i);
-		}
-		Assert.assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
-		// commit only the second last
-		source.notifyCheckpointComplete(598);
-		Assert.assertEquals(1, pendingCheckpoints.size());
-
-		// access invalid checkpoint
-		source.notifyCheckpointComplete(590);
-
-		// and the last
-		source.notifyCheckpointComplete(599);
-		Assert.assertEquals(0, pendingCheckpoints.size());
-
-		source.close();
-
-		deleteTestTopic("testCheckpointing");
-	}
-
-
-
+	
 	/**
 	 * Ensure Kafka is working on both producer and consumer side.
 	 * This executes a job that contains two Flink pipelines.
@@ -409,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
 		env.getConfig().disableSysoutLogging();
 
 		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
@@ -454,7 +386,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
 		env.getConfig().disableSysoutLogging();
 
 		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
@@ -499,7 +431,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
 		env.getConfig().disableSysoutLogging();
 		env.setBufferTimeout(0);
 
@@ -562,7 +494,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		runnerThread.start();
 
 		// wait a bit before canceling
-		Thread.sleep(8000);
+		Thread.sleep(2000);
 
 		Throwable failueCause = jobError.get();
 		if(failueCause != null) {
@@ -634,10 +566,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		runnerThread.start();
 
 		// wait a bit before canceling
-		Thread.sleep(8000);
+		Thread.sleep(2000);
 
 		Throwable failueCause = error.get();
-		if(failueCause != null) {
+		if (failueCause != null) {
 			failueCause.printStackTrace();
 			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
 		}
@@ -709,7 +641,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final int NUM_ELEMENTS = 20;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
+		env.getConfig().disableSysoutLogging();
+		
 		// create topics with content
 		final List<String> topics = new ArrayList<>();
 		for (int i = 0; i < NUM_TOPICS; i++) {
@@ -745,6 +678,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// run second job consuming from multiple topics
 		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.getConfig().disableSysoutLogging();
+		
 		stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps));
 
 		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
@@ -1453,50 +1388,50 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	/////////////			Testing the Kafka consumer with embeded watermark generation functionality			///////////////
 
-	@RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
-	public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
-
-		final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
-		final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
-
-		final Map<String, Boolean> topics = new HashMap<>();
-		topics.put(topic1, false);
-		topics.put(topic2, emptyPartition);
-
-		final int noOfTopcis = topics.size();
-		final int partitionsPerTopic = 1;
-		final int elementsPerPartition = 100 + 1;
-
-		final int totalElements = emptyPartition ?
-			partitionsPerTopic * elementsPerPartition :
-			noOfTopcis * partitionsPerTopic * elementsPerPartition;
-
-		createTestTopic(topic1, partitionsPerTopic, 1);
-		createTestTopic(topic2, partitionsPerTopic, 1);
-
-		final StreamExecutionEnvironment env =
-			StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(partitionsPerTopic);
-		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
-		env.getConfig().disableSysoutLogging();
-
-		TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
-
-		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-		producerProperties.setProperty("retries", "0");
-
-		putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
-
-		List<String> topicTitles = new ArrayList<>(topics.keySet());
-		runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
-
-		executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
-
-		for(String topic: topicTitles) {
-			deleteTestTopic(topic);
-		}
-	}
+//	@RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
+//	public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
+//
+//		final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
+//		final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
+//
+//		final Map<String, Boolean> topics = new HashMap<>();
+//		topics.put(topic1, false);
+//		topics.put(topic2, emptyPartition);
+//
+//		final int noOfTopcis = topics.size();
+//		final int partitionsPerTopic = 1;
+//		final int elementsPerPartition = 100 + 1;
+//
+//		final int totalElements = emptyPartition ?
+//			partitionsPerTopic * elementsPerPartition :
+//			noOfTopcis * partitionsPerTopic * elementsPerPartition;
+//
+//		createTestTopic(topic1, partitionsPerTopic, 1);
+//		createTestTopic(topic2, partitionsPerTopic, 1);
+//
+//		final StreamExecutionEnvironment env =
+//			StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+//		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+//		env.setParallelism(partitionsPerTopic);
+//		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
+//		env.getConfig().disableSysoutLogging();
+//
+//		TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
+//
+//		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+//		producerProperties.setProperty("retries", "0");
+//
+//		putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
+//
+//		List<String> topicTitles = new ArrayList<>(topics.keySet());
+//		runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
+//
+//		executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
+//
+//		for(String topic: topicTitles) {
+//			deleteTestTopic(topic);
+//		}
+//	}
 
 	private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index e251174..14e74f1 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -167,7 +167,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	// ------------------------------------------------------------------------
 
-	public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
+	public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
 		private final int expectedPartitions;
 
@@ -177,12 +177,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 
 		@Override
-		public int partition(Object next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-			Tuple2<Long, String> tuple = (Tuple2<Long, String>) next;
-
+		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
 			assertEquals(expectedPartitions, numPartitions);
 
-			return (int) (tuple.f0 % numPartitions);
+			return (int) (next.f0 % numPartitions);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9f8159c..9e3c33b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -26,16 +26,16 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,14 +44,18 @@ import java.io.Serializable;
 import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 /**
  * A class containing a special Kafka broker which has a log retention of only 250 ms.
  * This way, we can make sure our consumer is properly handling cases where we run into out of offset
  * errors
  */
+@SuppressWarnings("serial")
 public class KafkaShortRetentionTestBase implements Serializable {
+	
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
+	
 	private static KafkaTestEnvironment kafkaServer;
 	private static Properties standardProps;
 	private static ForkableFlinkMiniCluster flink;
@@ -108,7 +112,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		final String topic = "auto-offset-reset-test";
 
 		final int parallelism = 1;
-		final int elementsPerPartition = 50000; // with a sleep time of 1 ms per element, test should run for 50 s
+		final int elementsPerPartition = 50000;
 
 		Properties tprops = new Properties();
 		tprops.setProperty("retention.ms", "250");
@@ -162,6 +166,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		kafkaServer.deleteTestTopic(topic);
 	}
 
+	
 	private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
 		private int numJumps;
 		long nextExpected = 0;
@@ -205,12 +210,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	 */
 	public void runFailOnAutoOffsetResetNone() throws Exception {
 		final String topic = "auto-offset-reset-none-test";
-
 		final int parallelism = 1;
-		final int elementsPerPartition = 50000; // with a sleep time of 1 ms per element, test should run for 50 s
-		final int totalElements = parallelism * elementsPerPartition;
-
-
+		
 		kafkaServer.createTestTopic(topic, parallelism, 1);
 
 		final StreamExecutionEnvironment env =
@@ -218,8 +219,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		env.setParallelism(parallelism);
 		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
 		env.getConfig().disableSysoutLogging();
-
-
+		
 		// ----------- add consumer ----------
 
 		Properties customProps = new Properties();
@@ -245,4 +245,27 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		kafkaServer.deleteTestTopic(topic);
 	}
 
+	public void runFailOnAutoOffsetResetNoneEager() throws Exception {
+		final String topic = "auto-offset-reset-none-test";
+		final int parallelism = 1;
+
+		kafkaServer.createTestTopic(topic, parallelism, 1);
+
+		// ----------- add consumer ----------
+
+		Properties customProps = new Properties();
+		customProps.putAll(standardProps);
+		customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
+		
+		try {
+			kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
+			fail("should fail with an exception");
+		}
+		catch (IllegalArgumentException e) {
+			// expected
+			assertTrue(e.getMessage().contains("none"));
+		}
+
+		kafkaServer.deleteTestTopic(topic);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
new file mode 100644
index 0000000..0e16263
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import static org.junit.Assert.*;
+
+public class KafkaTopicPartitionTest {
+	
+	@Test
+	public void validateUid() {
+		Field uidField;
+		try {
+			uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID");
+			uidField.setAccessible(true);
+		}
+		catch (NoSuchFieldException e) {
+			fail("serialVersionUID is not defined");
+			return;
+		}
+		
+		assertTrue(Modifier.isStatic(uidField.getModifiers()));
+		assertTrue(Modifier.isFinal(uidField.getModifiers()));
+		assertTrue(Modifier.isPrivate(uidField.getModifiers()));
+		
+		assertEquals(long.class, uidField.getType());
+		
+		// the UID has to be constant to make sure old checkpoints/savepoints can be read 
+		try {
+			assertEquals(722083576322742325L, uidField.getLong(null));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index e94adb5..24822ed 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -36,30 +36,39 @@ public class JobManagerCommunicationUtils {
 	
 	
 	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
+		JobStatusMessage status = null;
 		
-		// find the jobID
-		Future<Object> listResponse = jobManager.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout);
-
-		List<JobStatusMessage> jobs;
-		try {
-			Object result = Await.result(listResponse, askTimeout);
-			jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-		}
-		catch (Exception e) {
-			throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-		}
+		for (int i = 0; i < 200; i++) {
+			// find the jobID
+			Future<Object> listResponse = jobManager.ask(
+					JobManagerMessages.getRequestRunningJobsStatus(),
+					askTimeout);
+	
+			List<JobStatusMessage> jobs;
+			try {
+				Object result = Await.result(listResponse, askTimeout);
+				jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+			}
+			catch (Exception e) {
+				throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
+			}
 		
-		if (jobs.isEmpty()) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		if (jobs.size() != 1) {
-			throw new Exception("Could not cancel job - more than one running job.");
+			if (jobs.isEmpty()) {
+				// try again, fall through the loop
+				Thread.sleep(50);
+			}
+			else if (jobs.size() == 1) {
+				status = jobs.get(0);
+			}
+			else {
+				throw new Exception("Could not cancel job - more than one running job.");
+			}
 		}
 		
-		JobStatusMessage status = jobs.get(0);
-		if (status.getJobState().isTerminalState()) {
+		if (status == null) {
+			throw new Exception("Could not cancel job - no running jobs");	
+		}
+		else if (status.getJobState().isTerminalState()) {
 			throw new Exception("Could not cancel job - job is not running any more");
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 17e2e6f..e74eee4 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -44,6 +43,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+@SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	private final int numberOfParallelSubtasks;
@@ -57,15 +57,6 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 		this.indexOfThisSubtask = indexOfThisSubtask;
 	}
 
-	private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
-		private static final long serialVersionUID = -1153976702711944427L;
-
-		@Override
-		public ExecutionConfig getExecutionConfig() {
-			return new ExecutionConfig();
-		}
-	}
-
 	@Override
 	public boolean isCheckpointingEnabled() {
 		return true;
@@ -152,12 +143,12 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
+	public <S> org.apache.flink.api.common.state.OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
+	public <S> org.apache.flink.api.common.state.OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
 		throw new UnsupportedOperationException();
 	}
 
@@ -175,4 +166,15 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException();
 	}
+	
+	// ------------------------------------------------------------------------
+
+	private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
+		private static final long serialVersionUID = -1153976702711944427L;
+
+		@Override
+		public ExecutionConfig getExecutionConfig() {
+			return new ExecutionConfig();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
index 4b17300..4388c9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import javax.annotation.Nullable;
+
 /**
  * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements,
  * and generates low watermarks that signal event time progress within the stream.
@@ -71,5 +73,6 @@ public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T>
 	 *
 	 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 	 */
+	@Nullable
 	Watermark getCurrentWatermark();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
index 48f29b2..5b5694c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
 
+import javax.annotation.Nullable;
+
 /**
  * The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps to elements,
  * and generates low watermarks that signal event time progress within the stream.
@@ -79,5 +81,6 @@ public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T
 	 *
 	 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
 	 */
+	@Nullable
 	Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
 }