You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:03 UTC

[47/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index 42a5951..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.common.TopicAndPartition;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.zookeeper.data.Stat;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class ZookeeperOffsetHandler implements OffsetHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-	
-	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
-	
-	
-	private final ZkClient zkClient;
-	
-	private final String groupId;
-
-	
-	public ZookeeperOffsetHandler(Properties props) {
-		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-		
-		if (this.groupId == null) {
-			throw new IllegalArgumentException("Required property '"
-					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
-		}
-		
-		String zkConnect = props.getProperty("zookeeper.connect");
-		if (zkConnect == null) {
-			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
-		}
-		
-		zkClient = new ZkClient(zkConnect,
-				Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
-				Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
-				new ZooKeeperStringSerializer());
-	}
-
-
-	@Override
-	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
-		for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
-			TopicPartition tp = entry.getKey();
-			long offset = entry.getValue();
-			
-			if (offset >= 0) {
-				setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
-			}
-		}
-	}
-
-	@Override
-	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
-		for (TopicPartition tp : partitions) {
-			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
-
-			if (offset != OFFSET_NOT_SET) {
-				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
-						tp.partition(), offset);
-
-				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
-				fetcher.seek(tp, offset + 1);
-			}
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		zkClient.close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Communication with Zookeeper
-	// ------------------------------------------------------------------------
-	
-	public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
-	}
-
-	public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-
-		scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
-				topicDirs.consumerOffsetDir() + "/" + tap.partition());
-
-		if (data._1().isEmpty()) {
-			return OFFSET_NOT_SET;
-		} else {
-			return Long.valueOf(data._1().get());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index 346a7d5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * 	# More Flink partitions than kafka partitions
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	---------------->	1
- * 			2   --------------/
- * 			3   -------------/
- * 			4	------------/
- * </pre>
- * 	--> Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	---------------->	1
- * 			2	---------------->	2
- * 									3
- * 									4
- * 									5
- * </pre>
- *
- *  --> Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
- *
- */
-public class FixedPartitioner extends KafkaPartitioner implements Serializable {
-	private static final long serialVersionUID = 1627268846962918126L;
-
-	int targetPartition = -1;
-
-	@Override
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		int p = 0;
-		for(int i = 0; i < parallelInstances; i++) {
-			if(i == parallelInstanceId) {
-				targetPartition = partitions[p];
-				return;
-			}
-			if(++p == partitions.length) {
-				p = 0;
-			}
-		}
-	}
-
-	@Override
-	public int partition(Object element, int numPartitions) {
-		if(targetPartition == -1) {
-			throw new RuntimeException("The partitioner has not been initialized properly");
-		}
-		return targetPartition;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
deleted file mode 100644
index 55519f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-/**
- * Extended Kafka Partitioner.
- * It contains a open() method which is called on each parallel instance.
- * Partitioners have to be serializable!
- */
-public abstract class KafkaPartitioner implements Partitioner, Serializable {
-
-	private static final long serialVersionUID = -1974260817778593473L;
-
-	/**
-	 * Initializer for the Partitioner.
-	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
-	 * @param parallelInstances the total number of parallel instances
-	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
-	 */
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		// overwrite this method if needed.
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 3d392aa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-	@Test
-	public void testPartitionsEqualConsumers() {
-		try {
-			int[] partitions = {4, 52, 17, 1};
-			
-			for (int i = 0; i < partitions.length; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", partitions.length, i);
-				
-				assertNotNull(parts);
-				assertEquals(1, parts.size());
-				assertTrue(contains(partitions, parts.get(0).partition()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiplePartitionsPerConsumers() {
-		try {
-			final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
-			final Set<Integer> allPartitions = new HashSet<>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
-			
-			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.length / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
-			
-			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() >= minPartitionsPerConsumer);
-				assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
-				for (TopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPartitionsFewerThanConsumers() {
-		try {
-			final int[] partitions = {4, 52, 17, 1};
-
-			final Set<Integer> allPartitions = new HashSet<>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
-
-			final int numConsumers = 2 * partitions.length + 3;
-			
-			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() <= 1);
-				
-				for (TopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testAssignEmptyPartitions() {
-		try {
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
-			assertNotNull(parts1);
-			assertTrue(parts1.isEmpty());
-
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
-			assertNotNull(parts2);
-			assertTrue(parts2.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGrowingPartitionsRemainsStable() {
-		try {
-			final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
-
-			final Set<Integer> allNewPartitions = new HashSet<>();
-			final Set<Integer> allInitialPartitions = new HashSet<>();
-			for (int i : newPartitions) {
-				allNewPartitions.add(i);
-			}
-			for (int i : initialPartitions) {
-				allInitialPartitions.add(i);
-			}
-
-			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
-			final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
-			
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 2);
-
-			assertNotNull(parts1);
-			assertNotNull(parts2);
-			assertNotNull(parts3);
-			
-			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
-			for (TopicPartition p : parts1) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts2) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts3) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			
-			// all partitions must have been assigned
-			assertTrue(allInitialPartitions.isEmpty());
-			
-			// grow the set of partitions and distribute anew
-			
-			List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 2);
-
-			// new partitions must include all old partitions
-			
-			assertTrue(parts1new.size() > parts1.size());
-			assertTrue(parts2new.size() > parts2.size());
-			assertTrue(parts3new.size() > parts3.size());
-			
-			assertTrue(parts1new.containsAll(parts1));
-			assertTrue(parts2new.containsAll(parts2));
-			assertTrue(parts3new.containsAll(parts3));
-
-			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
-			for (TopicPartition p : parts1new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts2new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts3new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allNewPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static boolean contains(int[] array, int value) {
-		for (int i : array) {
-			if (i == value) {
-				return true;
-			}
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
deleted file mode 100644
index e35fcfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
-	@Test
-	public void testValidateZooKeeperConfig() {
-		try {
-			// empty
-			Properties emptyProperties = new Properties();
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no connect string (only group string)
-			Properties noConnect = new Properties();
-			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no group string (only connect string)
-			Properties noGroup = new Properties();
-			noGroup.put("zookeeper.connect", "localhost:47574");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSnapshot() {
-		try {
-			Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
-			Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
-			Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			
-			offsetsField.setAccessible(true);
-			runningField.setAccessible(true);
-			mapField.setAccessible(true);
-
-			FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
-			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-			
-			long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
-			LinkedMap map = new LinkedMap();
-			
-			offsetsField.set(consumer, testOffsets);
-			runningField.set(consumer, true);
-			mapField.set(consumer, map);
-			
-			assertTrue(map.isEmpty());
-
-			// make multiple checkpoints
-			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
-				long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
-				assertArrayEquals(testOffsets, checkpoint);
-				
-				// change the offsets, make sure the snapshot did not change
-				long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
-				
-				for (int i = 0; i < testOffsets.length; i++) {
-					testOffsets[i] += 1L;
-				}
-				
-				assertArrayEquals(checkpointCopy, checkpoint);
-				
-				assertTrue(map.size() > 0);
-				assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	@Ignore("Kafka consumer internally makes an infinite loop")
-	public void testCreateSourceWithoutCluster() {
-		try {
-			Properties props = new Properties();
-			props.setProperty("zookeeper.connect", "localhost:56794");
-			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
-			props.setProperty("group.id", "non-existent-group");
-
-			new FlinkKafkaConsumer<>("no op topic", new JavaDefaultStringSchema(), props,
-					FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-					FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
deleted file mode 100644
index e9a5728..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,1124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
-import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.testutils.junit.RetryOnException;
-import org.apache.flink.testutils.junit.RetryRule;
-import org.apache.flink.util.Collector;
-
-import org.apache.flink.util.NetUtils;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.junit.Assert;
-
-import org.junit.Rule;
-import scala.collection.Seq;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-	
-	@Rule
-	public RetryRule retryRule = new RetryRule();
-	
-	// ------------------------------------------------------------------------
-	//  Required methods by the abstract test base
-	// ------------------------------------------------------------------------
-
-	protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
-			String topic, DeserializationSchema<T> deserializationSchema, Properties props);
-
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	//
-	//  The tests here are all not activated (by an @Test tag), but need
-	//  to be invoked from the extending classes. That way, the classes can
-	//  select which tests to run.
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Test that validates that checkpointing and checkpoint notification works properly
-	 */
-	public void runCheckpointingTest() throws Exception {
-		createTestTopic("testCheckpointing", 1, 1);
-
-		FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
-		Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-		pendingCheckpointsField.setAccessible(true);
-		LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
-		Assert.assertEquals(0, pendingCheckpoints.size());
-		source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
-		final long[] initialOffsets = new long[] { 1337 };
-
-		// first restore
-		source.restoreState(initialOffsets);
-
-		// then open
-		source.open(new Configuration());
-		long[] state1 = source.snapshotState(1, 15);
-
-		assertArrayEquals(initialOffsets, state1);
-
-		long[] state2 = source.snapshotState(2, 30);
-		Assert.assertArrayEquals(initialOffsets, state2);
-		Assert.assertEquals(2, pendingCheckpoints.size());
-
-		source.notifyCheckpointComplete(1);
-		Assert.assertEquals(1, pendingCheckpoints.size());
-
-		source.notifyCheckpointComplete(2);
-		Assert.assertEquals(0, pendingCheckpoints.size());
-
-		source.notifyCheckpointComplete(666); // invalid checkpoint
-		Assert.assertEquals(0, pendingCheckpoints.size());
-
-		// create 500 snapshots
-		for (int i = 100; i < 600; i++) {
-			source.snapshotState(i, 15 * i);
-		}
-		Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
-		// commit only the second last
-		source.notifyCheckpointComplete(598);
-		Assert.assertEquals(1, pendingCheckpoints.size());
-
-		// access invalid checkpoint
-		source.notifyCheckpointComplete(590);
-
-		// and the last
-		source.notifyCheckpointComplete(599);
-		Assert.assertEquals(0, pendingCheckpoints.size());
-
-		source.close();
-
-		deleteTestTopic("testCheckpointing");
-	}
-
-	/**
-	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
-	 *
-	 * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
-	 */
-	public void runOffsetInZookeeperValidationTest() throws Exception {
-		LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
-		final String topicName = "testOffsetHacking";
-		final int parallelism = 3;
-
-		createTestTopic(topicName, parallelism, 1);
-
-		StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env1.getConfig().disableSysoutLogging();
-		env1.enableCheckpointing(50);
-		env1.setNumberOfExecutionRetries(0);
-		env1.setParallelism(parallelism);
-
-		StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env2.getConfig().disableSysoutLogging();
-		env2.enableCheckpointing(50);
-		env2.setNumberOfExecutionRetries(0);
-		env2.setParallelism(parallelism);
-
-		StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env3.getConfig().disableSysoutLogging();
-		env3.enableCheckpointing(50);
-		env3.setNumberOfExecutionRetries(0);
-		env3.setParallelism(parallelism);
-
-		// write a sequence from 0 to 99 to each of the 3 partitions.
-		writeSequence(env1, topicName, 100, parallelism);
-
-		readSequence(env2, standardProps, parallelism, topicName, 100, 0);
-
-		ZkClient zkClient = createZookeeperClient();
-
-		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
-		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
-		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
-
-		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
-		assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-		assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-		assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-
-		LOG.info("Manipulating offsets");
-
-		// set the offset to 50 for the three partitions
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
-
-		zkClient.close();
-
-		// create new env
-		readSequence(env3, standardProps, parallelism, topicName, 50, 50);
-
-		deleteTestTopic(topicName);
-
-		LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
-	}
-
-	/**
-	 * Ensure Kafka is working on both producer and consumer side.
-	 * This executes a job that contains two Flink pipelines.
-	 *
-	 * <pre>
-	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
-	 * </pre>
-	 * 
-	 * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
-	 * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
-	 * cause the test to fail.
-	 */
-	@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
-	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
-		LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
-		final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
-		final int parallelism = 3;
-		final int elementsPerPartition = 100;
-		final int totalElements = parallelism * elementsPerPartition;
-
-		createTestTopic(topic, parallelism, 2);
-
-		final StreamExecutionEnvironment env =
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(parallelism);
-		env.getConfig().disableSysoutLogging();
-
-		TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
-
-		TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
-				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-		TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
-				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-		// ----------- add producer dataflow ----------
-
-		DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
-
-			private boolean running = true;
-
-			@Override
-			public void run(SourceContext<Tuple2<Long, String>> ctx) {
-				int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
-				int limit = cnt + elementsPerPartition;
-
-
-				while (running && cnt < limit) {
-					ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
-					cnt++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		});
-		stream.addSink(new FlinkKafkaProducer<>(brokerConnectionStrings, topic, sinkSchema));
-
-		// ----------- add consumer dataflow ----------
-
-		FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
-
-		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
-
-		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
-			private int elCnt = 0;
-			private BitSet validator = new BitSet(totalElements);
-
-			@Override
-			public void invoke(Tuple2<Long, String> value) throws Exception {
-				String[] sp = value.f1.split("-");
-				int v = Integer.parseInt(sp[1]);
-
-				assertEquals(value.f0 - 1000, (long) v);
-
-				assertFalse("Received tuple twice", validator.get(v));
-				validator.set(v);
-				elCnt++;
-
-				if (elCnt == totalElements) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != totalElements) {
-						fail("The bitset was not set to 1 on all elements. Next clear:"
-								+ nc + " Set: " + validator);
-					}
-					throw new SuccessException();
-				}
-			}
-
-			@Override
-			public void close() throws Exception {
-				super.close();
-			}
-		}).setParallelism(1);
-
-		try {
-			tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
-		}
-		catch (ProgramInvocationException | JobExecutionException e) {
-			// look for NotLeaderForPartitionException
-			Throwable cause = e.getCause();
-
-			// search for nested SuccessExceptions
-			int depth = 0;
-			while (cause != null && depth++ < 20) {
-				if (cause instanceof kafka.common.NotLeaderForPartitionException) {
-					throw (Exception) cause;
-				}
-				cause = cause.getCause();
-			}
-			throw e;
-		}
-
-		LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
-	 * Flink sources.
-	 */
-	public void runOneToOneExactlyOnceTest() throws Exception {
-		LOG.info("Starting runOneToOneExactlyOnceTest()");
-
-		final String topic = "oneToOneTopic";
-		final int parallelism = 5;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = parallelism * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-		createTestTopic(topic, parallelism, 1);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				brokerConnectionStrings,
-				topic, parallelism, numElementsPerPartition, true);
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.enableCheckpointing(500);
-		env.setParallelism(parallelism);
-		env.setNumberOfExecutionRetries(3);
-		env.getConfig().disableSysoutLogging();
-
-		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-		env
-				.addSource(kafkaSource)
-				.map(new PartitionValidatingMapper(parallelism, 1))
-				.map(new FailingIdentityMapper<Integer>(failAfterElements))
-				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		FailingIdentityMapper.failedBefore = false;
-		tryExecute(env, "One-to-one exactly once test");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
-	 * one Flink source will read multiple Kafka partitions.
-	 */
-	public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
-		LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
-		final String topic = "oneToManyTopic";
-		final int numPartitions = 5;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = numPartitions * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-		final int parallelism = 2;
-
-		createTestTopic(topic, numPartitions, 1);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				brokerConnectionStrings,
-				topic, numPartitions, numElementsPerPartition, true);
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.enableCheckpointing(500);
-		env.setParallelism(parallelism);
-		env.setNumberOfExecutionRetries(3);
-		env.getConfig().disableSysoutLogging();
-
-		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-		env
-				.addSource(kafkaSource)
-				.map(new PartitionValidatingMapper(numPartitions, 3))
-				.map(new FailingIdentityMapper<Integer>(failAfterElements))
-				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		FailingIdentityMapper.failedBefore = false;
-		tryExecute(env, "One-source-multi-partitions exactly once test");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
-	 * that some Flink sources will read no partitions.
-	 */
-	public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
-		LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
-		final String topic = "manyToOneTopic";
-		final int numPartitions = 5;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = numPartitions * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-		final int parallelism = 8;
-
-		createTestTopic(topic, numPartitions, 1);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				brokerConnectionStrings,
-				topic, numPartitions, numElementsPerPartition, true);
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.enableCheckpointing(500);
-		env.setParallelism(parallelism);
-		env.setNumberOfExecutionRetries(3);
-		env.getConfig().disableSysoutLogging();
-		env.setBufferTimeout(0);
-
-		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-		env
-			.addSource(kafkaSource)
-			.map(new PartitionValidatingMapper(numPartitions, 1))
-			.map(new FailingIdentityMapper<Integer>(failAfterElements))
-			.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		FailingIdentityMapper.failedBefore = false;
-		tryExecute(env, "multi-source-one-partitions exactly once test");
-
-
-		deleteTestTopic(topic);
-	}
-	
-	
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runCancelingOnFullInputTest() throws Exception {
-		final String topic = "cancelingOnFullTopic";
-
-		final int parallelism = 3;
-		createTestTopic(topic, parallelism, 1);
-
-		// launch a producer thread
-		DataGenerators.InfiniteStringsGenerator generator =
-				new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
-		generator.start();
-
-		// launch a consumer asynchronously
-
-		final AtomicReference<Throwable> jobError = new AtomicReference<>();
-
-		final Runnable jobRunner = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute();
-				}
-				catch (Throwable t) {
-					jobError.set(t);
-				}
-			}
-		};
-
-		Thread runnerThread = new Thread(jobRunner, "program runner thread");
-		runnerThread.start();
-
-		// wait a bit before canceling
-		Thread.sleep(2000);
-
-		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-		// wait for the program to be done and validate that we failed with the right exception
-		runnerThread.join();
-
-		Throwable failueCause = jobError.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-		if (generator.isAlive()) {
-			generator.shutdown();
-			generator.join();
-		}
-		else {
-			Throwable t = generator.getError();
-			if (t != null) {
-				t.printStackTrace();
-				fail("Generator failed: " + t.getMessage());
-			} else {
-				fail("Generator failed with no exception");
-			}
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading empty partitions. 
-	 */
-	public void runCancelingOnEmptyInputTest() throws Exception {
-		final String topic = "cancelingOnEmptyInputTopic";
-
-		final int parallelism = 3;
-		createTestTopic(topic, parallelism, 1);
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final Runnable jobRunner = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute();
-				}
-				catch (Throwable t) {
-					error.set(t);
-				}
-			}
-		};
-
-		Thread runnerThread = new Thread(jobRunner, "program runner thread");
-		runnerThread.start();
-
-		// wait a bit before canceling
-		Thread.sleep(2000);
-
-		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-		// wait for the program to be done and validate that we failed with the right exception
-		runnerThread.join();
-
-		Throwable failueCause = error.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runFailOnDeployTest() throws Exception {
-		final String topic = "failOnDeployTopic";
-
-		createTestTopic(topic, 2, 1);
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(12); // needs to be more that the mini cluster has slots
-		env.getConfig().disableSysoutLogging();
-
-		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-		env
-				.addSource(kafkaSource)
-				.addSink(new DiscardingSink<Integer>());
-
-		try {
-			env.execute();
-			fail("this test should fail with an exception");
-		}
-		catch (ProgramInvocationException e) {
-
-			// validate that we failed due to a NoResourceAvailableException
-			Throwable cause = e.getCause();
-			int depth = 0;
-			boolean foundResourceException = false;
-
-			while (cause != null && depth++ < 20) {
-				if (cause instanceof NoResourceAvailableException) {
-					foundResourceException = true;
-					break;
-				}
-				cause = cause.getCause();
-			}
-
-			assertTrue("Wrong exception", foundResourceException);
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	public void runInvalidOffsetTest() throws Exception {
-		final String topic = "invalidOffsetTopic";
-		final int parallelism = 1;
-
-		// create topic
-		createTestTopic(topic, parallelism, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-
-		// write 20 messages into topic:
-		writeSequence(env, topic, 20, parallelism);
-
-		// set invalid offset:
-		ZkClient zkClient = createZookeeperClient();
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);
-
-		// read from topic
-		final int valuesCount = 20;
-		final int startFrom = 0;
-		readSequence(env, standardCC.props().props(), parallelism, topic, valuesCount, startFrom);
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Test Flink's Kafka integration also with very big records (30MB)
-	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
-	 */
-	public void runBigRecordTestTopology() throws Exception {
-		LOG.info("Starting runBigRecordTestTopology()");
-
-		final String topic = "bigRecordTestTopic";
-		final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-
-		createTestTopic(topic, parallelism, 1);
-
-		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
-		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
-				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
-				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setNumberOfExecutionRetries(0);
-		env.getConfig().disableSysoutLogging();
-		env.enableCheckpointing(100);
-		env.setParallelism(parallelism);
-
-		// add consuming topology:
-		Properties consumerProps = new Properties();
-		consumerProps.putAll(standardProps);
-		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
-		consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
-		consumerProps.setProperty("queued.max.message.chunks", "1");
-
-		FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
-		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
-
-		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
-			private int elCnt = 0;
-
-			@Override
-			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
-				elCnt++;
-				if (value.f0 == -1) {
-					// we should have seen 11 elements now.
-					if(elCnt == 11) {
-						throw new SuccessException();
-					} else {
-						throw new RuntimeException("There have been "+elCnt+" elements");
-					}
-				}
-				if(elCnt > 10) {
-					throw new RuntimeException("More than 10 elements seen: "+elCnt);
-				}
-			}
-		});
-
-		// add producing topology
-		Properties producerProps = new Properties();
-		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
-		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
-
-		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-
-			private boolean running;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				running = true;
-			}
-
-			@Override
-			public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
-				Random rnd = new Random();
-				long cnt = 0;
-				int fifteenMb = 1024 * 1024 * 15;
-
-				while (running) {
-					byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
-					ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
-
-					Thread.sleep(100);
-
-					if (cnt == 10) {
-						// signal end
-						ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
-						break;
-					}
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		});
-
-		stream.addSink(new FlinkKafkaProducer<>(topic, deserSchema, producerProps));
-
-		tryExecute(env, "big topology test");
-
-		deleteTestTopic(topic);
-
-		LOG.info("Finished runBigRecordTestTopology()");
-	}
-
-	
-	public void runBrokerFailureTest() throws Exception {
-		LOG.info("starting runBrokerFailureTest()");
-
-		final String topic = "brokerFailureTestTopic";
-
-		final int parallelism = 2;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = parallelism * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-
-		createTestTopic(topic, parallelism, 2);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				brokerConnectionStrings,
-				topic, parallelism, numElementsPerPartition, true);
-
-		// find leader to shut down
-		ZkClient zkClient = createZookeeperClient();
-		PartitionMetadata firstPart = null;
-		do {
-			if (firstPart != null) {
-				LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-				// not the first try. Sleep a bit
-				Thread.sleep(150);
-			}
-
-			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-			firstPart = partitionMetadata.head();
-		}
-		while (firstPart.errorCode() != 0);
-		zkClient.close();
-
-		final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
-		final String leaderToShutDownConnection = 
-				NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
-		
-		
-		final int leaderIdToShutDown = firstPart.leader().get().id();
-		LOG.info("Leader to shutdown {}", leaderToShutDown);
-
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(parallelism);
-		env.enableCheckpointing(500);
-		env.setNumberOfExecutionRetries(3);
-		env.getConfig().disableSysoutLogging();
-
-
-		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-		env
-				.addSource(kafkaSource)
-				.map(new PartitionValidatingMapper(parallelism, 1))
-				.map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
-				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		BrokerKillingMapper.killedLeaderBefore = false;
-		tryExecute(env, "One-to-one exactly once test");
-
-		// start a new broker:
-		brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
-
-		LOG.info("finished runBrokerFailureTest()");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Reading writing test data sets
-	// ------------------------------------------------------------------------
-
-	private void readSequence(StreamExecutionEnvironment env, Properties cc,
-								final int sourceParallelism,
-								final String topicName,
-								final int valuesCount, final int startFrom) throws Exception {
-
-		final int finalCount = valuesCount * sourceParallelism;
-
-		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
-				new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
-
-		// create the consumer
-		FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
-
-		DataStream<Tuple2<Integer, Integer>> source = env
-				.addSource(consumer).setParallelism(sourceParallelism)
-				.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
-
-		// verify data
-		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-
-			private int[] values = new int[valuesCount];
-			private int count = 0;
-
-			@Override
-			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
-				values[value.f1 - startFrom]++;
-				count++;
-
-				// verify if we've seen everything
-				if (count == finalCount) {
-					for (int i = 0; i < values.length; i++) {
-						int v = values[i];
-						if (v != sourceParallelism) {
-							printTopic(topicName, valuesCount, deser);
-							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
-						}
-					}
-					// test has passed
-					throw new SuccessException();
-				}
-			}
-
-		}).setParallelism(1);
-
-		tryExecute(env, "Read data from Kafka");
-
-		LOG.info("Successfully read sequence for verification");
-	}
-
-	private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
-
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-			private boolean running = true;
-
-			@Override
-			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-				int cnt = 0;
-				int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-				while (running && cnt < numElements) {
-					ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
-					cnt++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		}).setParallelism(parallelism);
-		
-		stream.addSink(new FlinkKafkaProducer<>(topicName,
-				new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
-				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
-				new Tuple2Partitioner(parallelism)
-		)).setParallelism(parallelism);
-
-		env.execute("Write sequence");
-
-		LOG.info("Finished writing sequence");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Debugging utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Read topic to list, only using Kafka code.
-	 */
-	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if(kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
-		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
-		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
-		int read = 0;
-		while(iteratorToRead.hasNext()) {
-			read++;
-			result.add(iteratorToRead.next());
-			if(read == stopAfter) {
-				LOG.info("Read "+read+" elements");
-				return result;
-			}
-		}
-		return result;
-	}
-
-	private static void printTopic(String topicName, ConsumerConfig config,
-								DeserializationSchema<?> deserializationSchema,
-								int stopAfter) {
-
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
-		for (MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
-		// write the sequence to log for debugging purposes
-		Properties stdProps = standardCC.props().props();
-		Properties newProps = new Properties(stdProps);
-		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
-
-	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-			implements Checkpointed<Integer>, CheckpointNotifier {
-
-		private static final long serialVersionUID = 6334389850158707313L;
-
-		public static volatile boolean killedLeaderBefore;
-		public static volatile boolean hasBeenCheckpointedBeforeFailure;
-		
-		private final String leaderToShutDown;
-		private final int failCount;
-		private int numElementsTotal;
-
-		private boolean failer;
-		private boolean hasBeenCheckpointed;
-
-
-		public BrokerKillingMapper(String leaderToShutDown, int failCount) {
-			this.leaderToShutDown = leaderToShutDown;
-			this.failCount = failCount;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		}
-
-		@Override
-		public T map(T value) throws Exception {
-			numElementsTotal++;
-			
-			if (!killedLeaderBefore) {
-				Thread.sleep(10);
-				
-				if (failer && numElementsTotal >= failCount) {
-					// shut down a Kafka broker
-					KafkaServer toShutDown = null;
-					for (KafkaServer kafkaServer : brokers) {
-						String connectionUrl = 
-								NetUtils.hostAndPortToUrlString(
-										kafkaServer.config().advertisedHostName(),
-										kafkaServer.config().advertisedPort());
-						if (leaderToShutDown.equals(connectionUrl)) {
-							toShutDown = kafkaServer;
-							break;
-						}
-					}
-	
-					if (toShutDown == null) {
-						StringBuilder listOfBrokers = new StringBuilder();
-						for (KafkaServer kafkaServer : brokers) {
-							listOfBrokers.append(
-									NetUtils.hostAndPortToUrlString(
-											kafkaServer.config().advertisedHostName(),
-											kafkaServer.config().advertisedPort()));
-							listOfBrokers.append(" ; ");
-						}
-						
-						throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
-								+ " ; available brokers: " + listOfBrokers.toString());
-					}
-					else {
-						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-						killedLeaderBefore = true;
-						toShutDown.shutdown();
-					}
-				}
-			}
-			return value;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			hasBeenCheckpointed = true;
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsTotal;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			this.numElementsTotal = state;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
deleted file mode 100644
index b4511ce..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-
-public class KafkaITCase extends KafkaConsumerTestBase {
-	
-	@Override
-	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
-		return new FlinkKafkaConsumer081<>(topic, deserializationSchema, props);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testCheckpointing() throws Exception {
-		runCheckpointingTest();
-	}
-
-	@Test
-	public void testOffsetInZookeeper() throws Exception {
-		runOffsetInZookeeperValidationTest();
-	}
-	
-	@Test
-	public void testConcurrentProducerConsumerTopology() throws Exception {
-		runSimpleConcurrentProducerConsumerTopology();
-	}
-
-	// --- canceling / failures ---
-	
-	@Test
-	public void testCancelingEmptyTopic() throws Exception {
-		runCancelingOnEmptyInputTest();
-	}
-
-	@Test
-	public void testCancelingFullTopic() throws Exception {
-		runCancelingOnFullInputTest();
-	}
-
-	@Test
-	public void testFailOnDeploy() throws Exception {
-		runFailOnDeployTest();
-	}
-
-	@Test
-	public void testInvalidOffset() throws Exception {
-		runInvalidOffsetTest();
-	}
-
-	// --- source to partition mappings and exactly once ---
-	
-	@Test
-	public void testOneToOneSources() throws Exception {
-		runOneToOneExactlyOnceTest();
-	}
-
-	@Test
-	public void testOneSourceMultiplePartitions() throws Exception {
-		runOneSourceMultiplePartitionsExactlyOnceTest();
-	}
-
-	@Test
-	public void testMultipleSourcesOnePartition() throws Exception {
-		runMultipleSourcesOnePartitionExactlyOnceTest();
-	}
-
-	// --- broker failure ---
-
-	@Test
-	public void testBrokerFailure() throws Exception {
-		runBrokerFailureTest();
-	}
-
-	// --- special executions ---
-	
-	@Test
-	public void testBigRecordJob() throws Exception {
-		runBigRecordTestTopology();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
deleted file mode 100644
index 72d2772..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-	@Override
-	public long milliseconds() {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public long nanoseconds() {
-		return System.nanoTime();
-	}
-
-	@Override
-	public void sleep(long ms) {
-		try {
-			Thread.sleep(ms);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruption", e);
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
deleted file mode 100644
index 5001364..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import java.io.Serializable;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class KafkaProducerITCase extends KafkaTestBase {
-
-
-	/**
-	 * 
-	 * <pre>
-	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
-	 *            /                  |                                       \
-	 *           /                   |                                        \
-	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
-	 *           \                   |                                        /
-	 *            \                  |                                       /
-	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
-	 * </pre>
-	 * 
-	 * The mapper validates that the values come consistently from the correct Kafka partition.
-	 * 
-	 * The final sink validates that there are no duplicates and that all partitions are present.
-	 */
-	@Test
-	public void testCustomPartitioning() {
-		try {
-			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
-
-			final String topic = "customPartitioningTestTopic";
-			final int parallelism = 3;
-			
-			createTestTopic(topic, parallelism, 1);
-
-			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-			// ------ producing topology ---------
-			
-			// source has DOP 1 to make sure it generates no duplicates
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
-				private boolean running = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
-					long cnt = 0;
-					while (running) {
-						ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			})
-			.setParallelism(1);
-			
-			// sink partitions into 
-			stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(parallelism)))
-			.setParallelism(parallelism);
-
-			// ------ consuming topology ---------
-			
-			FlinkKafkaConsumer<Tuple2<Long, String>> source = 
-					new FlinkKafkaConsumer<>(topic, deserSchema, standardProps, 
-							FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-							FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-			
-			env.addSource(source).setParallelism(parallelism)
-
-					// mapper that validates partitioning and maps to partition
-					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-						
-						private int ourPartition = -1;
-						@Override
-						public Integer map(Tuple2<Long, String> value) {
-							int partition = value.f0.intValue() % parallelism;
-							if (ourPartition != -1) {
-								assertEquals("inconsistent partitioning", ourPartition, partition);
-							} else {
-								ourPartition = partition;
-							}
-							return partition;
-						}
-					}).setParallelism(parallelism)
-					
-					.addSink(new SinkFunction<Integer>() {
-						
-						private int[] valuesPerPartition = new int[parallelism];
-						
-						@Override
-						public void invoke(Integer value) throws Exception {
-							valuesPerPartition[value]++;
-							
-							boolean missing = false;
-							for (int i : valuesPerPartition) {
-								if (i < 100) {
-									missing = true;
-									break;
-								}
-							}
-							if (!missing) {
-								throw new SuccessException();
-							}
-						}
-					}).setParallelism(1);
-			
-			tryExecute(env, "custom partitioning test");
-
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	// ------------------------------------------------------------------------
-
-	public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
-
-		private final int expectedPartitions;
-
-		public CustomPartitioner(int expectedPartitions) {
-			this.expectedPartitions = expectedPartitions;
-		}
-
-		@Override
-		public int partition(Object key, int numPartitions) {
-			@SuppressWarnings("unchecked")
-			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-			
-			assertEquals(expectedPartitions, numPartitions);
-			
-			return (int) (tuple.f0 % numPartitions);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
deleted file mode 100644
index c5c3387..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FlinkKafkaProducer.class)
-public class KafkaProducerTest extends TestLogger {
-	
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testPropagateExceptions() {
-		try {
-			// mock kafka producer
-			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
-			
-			// partition setup
-			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
-					Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
-
-			// failure when trying to send an element
-			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
-				.thenAnswer(new Answer<Future<RecordMetadata>>() {
-					@Override
-					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-						Callback callback = (Callback) invocation.getArguments()[1];
-						callback.onCompletion(null, new Exception("Test error"));
-						return null;
-					}
-				});
-			
-			// make sure the FlinkKafkaProducer instantiates our mock producer
-			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
-			
-			// (1) producer that propagates errors
-			
-			FlinkKafkaProducer<String> producerPropagating = new FlinkKafkaProducer<String>(
-					"mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
-
-			producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerPropagating.open(new Configuration());
-			
-			try {
-				producerPropagating.invoke("value");
-				producerPropagating.invoke("value");
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getMessage());
-				assertTrue(e.getCause().getMessage().contains("Test error"));
-			}
-
-			// (2) producer that only logs errors
-			
-			FlinkKafkaProducer<String> producerLogging = new FlinkKafkaProducer<String>(
-					"mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
-			producerLogging.setLogFailuresOnly(true);
-			
-			producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerLogging.open(new Configuration());
-
-			producerLogging.invoke("value");
-			producerLogging.invoke("value");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}