You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:53 UTC

[36/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
deleted file mode 100644
index 7b4961d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,1137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.connectors.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.testutils.DiscardingSink;
-import org.apache.flink.streaming.connectors.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.connectors.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.streaming.connectors.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.testutils.Tuple2Partitioner;
-import org.apache.flink.streaming.connectors.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-
-import scala.collection.Seq;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
-
-	// ------------------------------------------------------------------------
-	//  Required methods by the abstract test base
-	// ------------------------------------------------------------------------
-
-	protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
-			String topic, DeserializationSchema<T> deserializationSchema, Properties props);
-
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	//
-	//  The tests here are all not activated (by an @Test tag), but need
-	//  to be invoked from the extending classes. That way, the classes can
-	//  select which tests to run.
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Test that validates that checkpointing and checkpoint notification works properly
-	 */
-	public void runCheckpointingTest() {
-		try {
-			createTestTopic("testCheckpointing", 1, 1);
-
-			FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
-			Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			pendingCheckpointsField.setAccessible(true);
-			LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
-
-			Assert.assertEquals(0, pendingCheckpoints.size());
-			source.setRuntimeContext(new MockRuntimeContext(1, 0));
-
-			final long[] initialOffsets = new long[] { 1337 };
-
-			// first restore
-			source.restoreState(initialOffsets);
-
-			// then open
-			source.open(new Configuration());
-			long[] state1 = source.snapshotState(1, 15);
-
-			assertArrayEquals(initialOffsets, state1);
-
-			long[] state2 = source.snapshotState(2, 30);
-			Assert.assertArrayEquals(initialOffsets, state2);
-			Assert.assertEquals(2, pendingCheckpoints.size());
-
-			source.notifyCheckpointComplete(1);
-			Assert.assertEquals(1, pendingCheckpoints.size());
-
-			source.notifyCheckpointComplete(2);
-			Assert.assertEquals(0, pendingCheckpoints.size());
-
-			source.notifyCheckpointComplete(666); // invalid checkpoint
-			Assert.assertEquals(0, pendingCheckpoints.size());
-
-			// create 500 snapshots
-			for (int i = 100; i < 600; i++) {
-				source.snapshotState(i, 15 * i);
-			}
-			Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
-
-			// commit only the second last
-			source.notifyCheckpointComplete(598);
-			Assert.assertEquals(1, pendingCheckpoints.size());
-
-			// access invalid checkpoint
-			source.notifyCheckpointComplete(590);
-
-			// and the last
-			source.notifyCheckpointComplete(599);
-			Assert.assertEquals(0, pendingCheckpoints.size());
-
-			source.close();
-
-			deleteTestTopic("testCheckpointing");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
-	 *
-	 * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
-	 */
-	public void runOffsetInZookeeperValidationTest() {
-		try {
-			LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
-
-			final String topicName = "testOffsetHacking";
-			final int parallelism = 3;
-			
-			createTestTopic(topicName, parallelism, 1);
-
-			StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env1.getConfig().disableSysoutLogging();
-			env1.enableCheckpointing(50);
-			env1.setNumberOfExecutionRetries(0);
-			env1.setParallelism(parallelism);
-
-			StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env2.getConfig().disableSysoutLogging();
-			env2.enableCheckpointing(50);
-			env2.setNumberOfExecutionRetries(0);
-			env2.setParallelism(parallelism);
-
-			StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env3.getConfig().disableSysoutLogging();
-			env3.enableCheckpointing(50);
-			env3.setNumberOfExecutionRetries(0);
-			env3.setParallelism(parallelism);
-
-			// write a sequence from 0 to 99 to each of the 3 partitions.
-			writeSequence(env1, topicName, 100, parallelism);
-
-			readSequence(env2, standardProps, parallelism, topicName, 100, 0);
-
-			ZkClient zkClient = createZookeeperClient();
-			
-			long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
-			long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
-			long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
-
-			LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
-
-			assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-			assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-			assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-
-			LOG.info("Manipulating offsets");
-
-			// set the offset to 50 for the three partitions
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
-
-			zkClient.close();
-			
-			// create new env
-			readSequence(env3, standardProps, parallelism, topicName, 50, 50);
-
-			deleteTestTopic(topicName);
-			
-			LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Ensure Kafka is working on both producer and consumer side.
-	 * This executes a job that contains two Flink pipelines.
-	 *
-	 * <pre>
-	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
-	 * </pre>
-	 */
-	public void runSimpleConcurrentProducerConsumerTopology() {
-		try {
-			LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
-
-			final String topic = "concurrentProducerConsumerTopic";
-			final int parallelism = 3;
-			final int elementsPerPartition = 100;
-			final int totalElements = parallelism * elementsPerPartition;
-
-			createTestTopic(topic, parallelism, 2);
-
-			final StreamExecutionEnvironment env =
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-
-			TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
-					new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
-					new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-			// ----------- add producer dataflow ----------
-
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
-
-				private boolean running = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) {
-					int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
-					int limit = cnt + elementsPerPartition;
-
-
-					while (running && cnt < limit) {
-						ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
-
-			// ----------- add consumer dataflow ----------
-
-			FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
-
-			DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
-
-			consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
-				private int elCnt = 0;
-				private BitSet validator = new BitSet(totalElements);
-
-				@Override
-				public void invoke(Tuple2<Long, String> value) throws Exception {
-					String[] sp = value.f1.split("-");
-					int v = Integer.parseInt(sp[1]);
-
-					assertEquals(value.f0 - 1000, (long) v);
-
-					assertFalse("Received tuple twice", validator.get(v));
-					validator.set(v);
-					elCnt++;
-
-					if (elCnt == totalElements) {
-						// check if everything in the bitset is set to true
-						int nc;
-						if ((nc = validator.nextClearBit(0)) != totalElements) {
-							fail("The bitset was not set to 1 on all elements. Next clear:"
-									+ nc + " Set: " + validator);
-						}
-						throw new SuccessException();
-					}
-				}
-
-				@Override
-				public void close() throws Exception {
-					super.close();
-				}
-			}).setParallelism(1);
-
-			tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
-
-			LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
-	 * Flink sources.
-	 */
-	public void runOneToOneExactlyOnceTest() {
-		try {
-			LOG.info("Starting runOneToOneExactlyOnceTest()");
-
-			final String topic = "oneToOneTopic";
-			final int parallelism = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = parallelism * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-			createTestTopic(topic, parallelism, 1);
-			
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings, 
-					topic, parallelism, numElementsPerPartition, true);
-			
-			// run the topology that fails and recovers
-
-			DeserializationSchema<Integer> schema = 
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-			
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(parallelism, 1))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "One-to-one exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
-	 * one Flink source will read multiple Kafka partitions.
-	 */
-	public void runOneSourceMultiplePartitionsExactlyOnceTest() {
-		try {
-			LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
-
-			final String topic = "oneToManyTopic";
-			final int numPartitions = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = numPartitions * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-			final int parallelism = 2;
-
-			createTestTopic(topic, numPartitions, 1);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, numPartitions, numElementsPerPartition, true);
-
-			// run the topology that fails and recovers
-
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(numPartitions, 3))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "One-source-multi-partitions exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
-	 * that some Flink sources will read no partitions.
-	 */
-	public void runMultipleSourcesOnePartitionExactlyOnceTest() {
-		try {
-			LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
-
-			final String topic = "manyToOneTopic";
-			final int numPartitions = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = numPartitions * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-
-			final int parallelism = 8;
-
-			createTestTopic(topic, numPartitions, 1);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, numPartitions, numElementsPerPartition, true);
-
-			// run the topology that fails and recovers
-			
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			env.setBufferTimeout(0);
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(numPartitions, 1))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-			
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "multi-source-one-partitions exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runCancelingOnFullInputTest() {
-		try {
-			final String topic = "cancelingOnFullTopic";
-
-			final int parallelism = 3;
-			createTestTopic(topic, parallelism, 1);
-
-			// launch a producer thread
-			DataGenerators.InfiniteStringsGenerator generator =
-					new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
-			generator.start();
-
-			// launch a consumer asynchronously
-
-			final AtomicReference<Throwable> jobError = new AtomicReference<>();
-
-			final Runnable jobRunner = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-						env.setParallelism(parallelism);
-						env.enableCheckpointing(100);
-						env.getConfig().disableSysoutLogging();
-
-						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
-						env.addSource(source).addSink(new DiscardingSink<String>());
-
-						env.execute();
-					}
-					catch (Throwable t) {
-						jobError.set(t);
-					}
-				}
-			};
-
-			Thread runnerThread = new Thread(jobRunner, "program runner thread");
-			runnerThread.start();
-
-			// wait a bit before canceling
-			Thread.sleep(2000);
-
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
-
-			// wait for the program to be done and validate that we failed with the right exception
-			runnerThread.join();
-
-			Throwable failueCause = jobError.get();
-			assertNotNull("program did not fail properly due to canceling", failueCause);
-			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-			if (generator.isAlive()) {
-				generator.shutdown();
-				generator.join();
-			}
-			else {
-				Throwable t = generator.getError();
-				if (t != null) {
-					t.printStackTrace();
-					fail("Generator failed: " + t.getMessage());
-				} else {
-					fail("Generator failed with no exception");
-				}
-			}
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading empty partitions. 
-	 */
-	public void runCancelingOnEmptyInputTest() {
-		try {
-			final String topic = "cancelingOnEmptyInputTopic";
-
-			final int parallelism = 3;
-			createTestTopic(topic, parallelism, 1);
-
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-
-			final Runnable jobRunner = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-						env.setParallelism(parallelism);
-						env.enableCheckpointing(100);
-						env.getConfig().disableSysoutLogging();
-
-						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
-
-						env.addSource(source).addSink(new DiscardingSink<String>());
-
-						env.execute();
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-
-			Thread runnerThread = new Thread(jobRunner, "program runner thread");
-			runnerThread.start();
-
-			// wait a bit before canceling
-			Thread.sleep(2000);
-
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
-
-			// wait for the program to be done and validate that we failed with the right exception
-			runnerThread.join();
-
-			Throwable failueCause = error.get();
-			assertNotNull("program did not fail properly due to canceling", failueCause);
-			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runFailOnDeployTest() {
-		try {
-			final String topic = "failOnDeployTopic";
-			
-			createTestTopic(topic, 2, 1);
-
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(12); // needs to be more that the mini cluster has slots
-			env.getConfig().disableSysoutLogging();
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.addSink(new DiscardingSink<Integer>());
-			
-			try {
-				env.execute();
-				fail("this test should fail with an exception");
-			}
-			catch (ProgramInvocationException e) {
-				
-				// validate that we failed due to a NoResourceAvailableException
-				Throwable cause = e.getCause();
-				int depth = 0;
-				boolean foundResourceException = false;
-				
-				while (cause != null && depth++ < 20) {
-					if (cause instanceof NoResourceAvailableException) {
-						foundResourceException = true;
-						break;
-					}
-					cause = cause.getCause();
-				}
-				
-				assertTrue("Wrong exception", foundResourceException);
-			}
-
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Test Flink's Kafka integration also with very big records (30MB)
-	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
-	 */
-	public void runBigRecordTestTopology() {
-		try {
-			LOG.info("Starting runBigRecordTestTopology()");
-
-			final String topic = "bigRecordTestTopic";
-			final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-			
-			createTestTopic(topic, parallelism, 1);
-
-			final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
-			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
-					new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
-					new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-			env.enableCheckpointing(100);
-			env.setParallelism(parallelism);
-
-			// add consuming topology:
-			Properties consumerProps = new Properties();
-			consumerProps.putAll(standardProps);
-			consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
-			consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
-			consumerProps.setProperty("queued.max.message.chunks", "1");
-
-			FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
-			DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
-
-			consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
-				private int elCnt = 0;
-
-				@Override
-				public void invoke(Tuple2<Long, byte[]> value) throws Exception {
-					elCnt++;
-					if (value.f0 == -1) {
-						// we should have seen 11 elements now.
-						if(elCnt == 11) {
-							throw new SuccessException();
-						} else {
-							throw new RuntimeException("There have been "+elCnt+" elements");
-						}
-					}
-					if(elCnt > 10) {
-						throw new RuntimeException("More than 10 elements seen: "+elCnt);
-					}
-				}
-			});
-
-			// add producing topology
-			Properties producerProps = new Properties();
-			producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
-			
-			DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-
-				private boolean running;
-
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					running = true;
-				}
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
-					Random rnd = new Random();
-					long cnt = 0;
-					int fifteenMb = 1024 * 1024 * 15;
-
-					while (running) {
-						byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
-						ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
-
-						Thread.sleep(100);
-
-						if (cnt == 10) {
-							// signal end
-							ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
-							break;
-						}
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
-
-			stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
-					producerProps, deserSchema));
-
-			tryExecute(env, "big topology test");
-
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished runBigRecordTestTopology()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	
-	public void runBrokerFailureTest() {
-		try {
-			LOG.info("starting runBrokerFailureTest()");
-			
-			final String topic = "brokerFailureTestTopic";
-
-			final int parallelism = 2;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = parallelism * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-
-			createTestTopic(topic, parallelism, 2);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, parallelism, numElementsPerPartition, true);
-
-			// find leader to shut down
-			ZkClient zkClient = createZookeeperClient();
-			PartitionMetadata firstPart = null;
-			do {
-				if (firstPart != null) {
-					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-					// not the first try. Sleep a bit
-					Thread.sleep(150);
-				}
-
-				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-				firstPart = partitionMetadata.head();
-			}
-			while (firstPart.errorCode() != 0);
-			zkClient.close();
-
-			final String leaderToShutDown = firstPart.leader().get().connectionString();
-			LOG.info("Leader to shutdown {}", leaderToShutDown);
-			
-			
-			// run the topology that fails and recovers
-
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(parallelism);
-			env.enableCheckpointing(500);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			
-
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(parallelism, 1))
-					.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-			BrokerKillingMapper.killedLeaderBefore = false;
-			tryExecute(env, "One-to-one exactly once test");
-
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
-
-			LOG.info("finished runBrokerFailureTest()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Reading writing test data sets
-	// ------------------------------------------------------------------------
-
-	private void readSequence(StreamExecutionEnvironment env, Properties cc,
-								final int sourceParallelism,
-								final String topicName,
-								final int valuesCount, final int startFrom) throws Exception {
-
-		final int finalCount = valuesCount * sourceParallelism;
-
-		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
-				new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
-
-		// create the consumer
-		FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
-
-		DataStream<Tuple2<Integer, Integer>> source = env
-				.addSource(consumer).setParallelism(sourceParallelism)
-				.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
-
-		// verify data
-		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-
-			private int[] values = new int[valuesCount];
-			private int count = 0;
-
-			@Override
-			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
-				values[value.f1 - startFrom]++;
-				count++;
-
-				// verify if we've seen everything
-				if (count == finalCount) {
-					for (int i = 0; i < values.length; i++) {
-						int v = values[i];
-						if (v != sourceParallelism) {
-							printTopic(topicName, valuesCount, deser);
-							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
-						}
-					}
-					// test has passed
-					throw new SuccessException();
-				}
-			}
-
-		}).setParallelism(1);
-
-		tryExecute(env, "Read data from Kafka");
-
-		LOG.info("Successfully read sequence for verification");
-	}
-
-	private static void writeSequence(StreamExecutionEnvironment env, String topicName,
-									  final int numElements, int parallelism) throws Exception {
-
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-			private boolean running = true;
-
-			@Override
-			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-				int cnt = 0;
-				int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-				while (running && cnt < numElements) {
-					ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
-					cnt++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		}).setParallelism(parallelism);
-		
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
-				topicName,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
-				new Tuple2Partitioner(parallelism)
-		)).setParallelism(parallelism);
-
-		env.execute("Write sequence");
-
-		LOG.info("Finished writing sequence");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Debugging utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Read topic to list, only using Kafka code.
-	 */
-	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if(kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
-		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
-		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
-		int read = 0;
-		while(iteratorToRead.hasNext()) {
-			read++;
-			result.add(iteratorToRead.next());
-			if(read == stopAfter) {
-				LOG.info("Read "+read+" elements");
-				return result;
-			}
-		}
-		return result;
-	}
-
-	private static void printTopic(String topicName, ConsumerConfig config,
-								   DeserializationSchema<?> deserializationSchema,
-								   int stopAfter) {
-
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
-		for (MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
-		// write the sequence to log for debugging purposes
-		Properties stdProps = standardCC.props().props();
-		Properties newProps = new Properties(stdProps);
-		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
-
-	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-			implements Checkpointed<Integer>, CheckpointNotifier {
-
-		private static final long serialVersionUID = 6334389850158707313L;
-
-		public static volatile boolean killedLeaderBefore;
-		public static volatile boolean hasBeenCheckpointedBeforeFailure;
-		
-		private final String leaderToShutDown;
-		private final int failCount;
-		private int numElementsTotal;
-
-		private boolean failer;
-		private boolean hasBeenCheckpointed;
-
-
-		public BrokerKillingMapper(String leaderToShutDown, int failCount) {
-			this.leaderToShutDown = leaderToShutDown;
-			this.failCount = failCount;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		}
-
-		@Override
-		public T map(T value) throws Exception {
-			numElementsTotal++;
-			
-			if (!killedLeaderBefore) {
-				Thread.sleep(10);
-				
-				if (failer && numElementsTotal >= failCount) {
-					// shut down a Kafka broker
-					KafkaServer toShutDown = null;
-					for (KafkaServer kafkaServer : brokers) {
-						if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
-							toShutDown = kafkaServer;
-							break;
-						}
-					}
-	
-					if (toShutDown == null) {
-						throw new Exception("Cannot find broker to shut down");
-					}
-					else {
-						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-						killedLeaderBefore = true;
-						toShutDown.shutdown();
-					}
-				}
-			}
-			return value;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			hasBeenCheckpointed = true;
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsTotal;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			this.numElementsTotal = state;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
deleted file mode 100644
index b910b54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import kafka.utils.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaLocalSystemTime implements Time {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-	@Override
-	public long milliseconds() {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public long nanoseconds() {
-		return System.nanoTime();
-	}
-
-	@Override
-	public void sleep(long ms) {
-		try {
-			Thread.sleep(ms);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruption", e);
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
deleted file mode 100644
index fd980d9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class KafkaProducerITCase extends KafkaTestBase {
-
-
-	/**
-	 * 
-	 * <pre>
-	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
-	 *            /                  |                                       \
-	 *           /                   |                                        \
-	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
-	 *           \                   |                                        /
-	 *            \                  |                                       /
-	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
-	 * </pre>
-	 * 
-	 * The mapper validates that the values come consistently from the correct Kafka partition.
-	 * 
-	 * The final sink validates that there are no duplicates and that all partitions are present.
-	 */
-	@Test
-	public void testCustomPartitioning() {
-		try {
-			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
-
-			final String topic = "customPartitioningTestTopic";
-			final int parallelism = 3;
-			
-			createTestTopic(topic, parallelism, 1);
-
-			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-			// ------ producing topology ---------
-			
-			// source has DOP 1 to make sure it generates no duplicates
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
-				private boolean running = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
-					long cnt = 0;
-					while (running) {
-						ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			})
-			.setParallelism(1);
-			
-			// sink partitions into 
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(
-					brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
-			.setParallelism(parallelism);
-
-			// ------ consuming topology ---------
-			
-			FlinkKafkaConsumer<Tuple2<Long, String>> source = 
-					new FlinkKafkaConsumer<>(topic, deserSchema, standardProps, 
-							FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-							FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-			
-			env.addSource(source).setParallelism(parallelism)
-
-					// mapper that validates partitioning and maps to partition
-					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-						
-						private int ourPartition = -1;
-						@Override
-						public Integer map(Tuple2<Long, String> value) {
-							int partition = value.f0.intValue() % parallelism;
-							if (ourPartition != -1) {
-								assertEquals("inconsistent partitioning", ourPartition, partition);
-							} else {
-								ourPartition = partition;
-							}
-							return partition;
-						}
-					}).setParallelism(parallelism)
-					
-					.addSink(new SinkFunction<Integer>() {
-						
-						private int[] valuesPerPartition = new int[parallelism];
-						
-						@Override
-						public void invoke(Integer value) throws Exception {
-							valuesPerPartition[value]++;
-							
-							boolean missing = false;
-							for (int i : valuesPerPartition) {
-								if (i < 100) {
-									missing = true;
-									break;
-								}
-							}
-							if (!missing) {
-								throw new SuccessException();
-							}
-						}
-					}).setParallelism(1);
-			
-			tryExecute(env, "custom partitioning test");
-
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	
-	// ------------------------------------------------------------------------
-
-	public static class CustomPartitioner implements SerializableKafkaPartitioner {
-
-		private final int expectedPartitions;
-
-		public CustomPartitioner(int expectedPartitions) {
-			this.expectedPartitions = expectedPartitions;
-		}
-
-		@Override
-		public int partition(Object key, int numPartitions) {
-			@SuppressWarnings("unchecked")
-			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-			
-			assertEquals(expectedPartitions, numPartitions);
-			
-			return (int) (tuple.f0 % numPartitions);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
deleted file mode 100644
index 35f050c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import kafka.admin.AdminUtils;
-import kafka.consumer.ConsumerConfig;
-import kafka.network.SocketServer;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
-import org.apache.flink.kafka_backport.common.PartitionInfo;
-import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.internals.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.testutils.SuccessException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The base for the Kafka tests. It brings up:
- * <ul>
- *     <li>A ZooKeeper mini cluster</li>
- *     <li>Three Kafka Brokers (mini clusters)</li>
- *     <li>A Flink mini cluster</li>
- * </ul>
- * 
- * <p>Code in this test is based on the following GitHub repository:
- * <a href="https://github.com/sakserv/hadoop-mini-clusters">
- *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
- * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
- */
-@SuppressWarnings("serial")
-public abstract class KafkaTestBase {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-	
-	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
-	protected static String zookeeperConnectionString;
-
-	protected static File tmpZkDir;
-
-	protected static File tmpKafkaParent;
-
-	protected static TestingServer zookeeper;
-	protected static List<KafkaServer> brokers;
-	protected static String brokerConnectionStrings = "";
-
-	protected static ConsumerConfig standardCC;
-	protected static Properties standardProps;
-	
-	protected static ForkableFlinkMiniCluster flink;
-
-	protected static int flinkPort;
-	
-	
-	
-	// ------------------------------------------------------------------------
-	//  Setup and teardown of the mini clusters
-	// ------------------------------------------------------------------------
-	
-	@BeforeClass
-	public static void prepare() throws IOException {
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Starting KafkaITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-		
-		LOG.info("Starting KafkaITCase.prepare()");
-		
-		File tempDir = new File(System.getProperty("java.io.tmpdir"));
-		
-		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
-
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
-		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
-
-		List<File> tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
-		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-			File tmpDir = new File(tmpKafkaParent, "server-" + i);
-			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
-			tmpKafkaDirs.add(tmpDir);
-		}
-
-		String kafkaHost = "localhost";
-		int zkPort = NetUtils.getAvailablePort();
-		zookeeperConnectionString = "localhost:" + zkPort;
-
-		zookeeper = null;
-		brokers = null;
-
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = new TestingServer(zkPort, tmpZkDir);
-			
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
-			
-			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
-				SocketServer socketServer = brokers.get(i).socketServer();
-				
-				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionStrings += host+":"+socketServer.port()+",";
-			}
-
-			LOG.info("ZK and KafkaServer started.");
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Test setup failed: " + t.getMessage());
-		}
-
-		standardProps = new Properties();
-
-		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
-		standardProps.setProperty("group.id", "flink-tests");
-		standardProps.setProperty("auto.commit.enable", "false");
-		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
-		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
-		
-		Properties consumerConfigProps = new Properties();
-		consumerConfigProps.putAll(standardProps);
-		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
-		standardCC = new ConsumerConfig(consumerConfigProps);
-		
-		// start also a re-usable Flink mini cluster
-		
-		Configuration flinkConfig = new Configuration();
-		flinkConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
-		flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
-
-//		flinkConfig.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
-//		flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8080);
-		
-		flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
-		flinkPort = flink.getJobManagerRPCPort();
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    Shut down KafkaITCase ");
-		LOG.info("-------------------------------------------------------------------------");
-
-		flinkPort = -1;
-		flink.shutdown();
-		
-		for (KafkaServer broker : brokers) {
-			if (broker != null) {
-				broker.shutdown();
-			}
-		}
-		brokers.clear();
-		
-		if (zookeeper != null) {
-			try {
-				zookeeper.stop();
-			}
-			catch (Exception e) {
-				LOG.warn("ZK.stop() failed", e);
-			}
-			zookeeper = null;
-		}
-		
-		// clean up the temp spaces
-		
-		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpKafkaParent);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-		if (tmpZkDir != null && tmpZkDir.exists()) {
-			try {
-				FileUtils.deleteDirectory(tmpZkDir);
-			}
-			catch (Exception e) {
-				// ignore
-			}
-		}
-
-		LOG.info("-------------------------------------------------------------------------");
-		LOG.info("    KafkaITCase finished"); 
-		LOG.info("-------------------------------------------------------------------------");
-	}
-
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
-												String kafkaHost,
-												String zookeeperConnectionString) throws Exception {
-		Properties kafkaProperties = new Properties();
-
-		int kafkaPort = NetUtils.getAvailablePort();
-
-		// properties have to be Strings
-		kafkaProperties.put("advertised.host.name", kafkaHost);
-		kafkaProperties.put("port", Integer.toString(kafkaPort));
-		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		kafkaProperties.put("message.max.bytes", "" + (50 * 1024 * 1024));
-		kafkaProperties.put("replica.fetch.max.bytes", "" + (50 * 1024 * 1024));
-		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-		server.startup();
-		return server;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Execution utilities
-	// ------------------------------------------------------------------------
-	
-	protected ZkClient createZookeeperClient() {
-		return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-	}
-	
-	protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
-		try {
-			see.execute(name);
-		}
-		catch (ProgramInvocationException | JobExecutionException root) {
-			Throwable cause = root.getCause();
-			
-			// search for nested SuccessExceptions
-			int depth = 0;
-			while (!(cause instanceof SuccessException)) {
-				if (cause == null || depth++ == 20) {
-					root.printStackTrace();
-					fail("Test failed: " + root.getMessage());
-				}
-				else {
-					cause = cause.getCause();
-				}
-			}
-		}
-	}
-
-	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-		
-		// create topic with one client
-		Properties topicConfig = new Properties();
-		LOG.info("Creating topic {}", topic);
-
-		ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-		
-		AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
-		creator.close();
-		
-		// validate that the topic has been created
-
-		try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
-				standardProps, null, new ByteArrayDeserializer(), new ByteArrayDeserializer()))
-		{
-			final long deadline = System.currentTimeMillis() + 30000;
-			do {
-				List<PartitionInfo> partitions = consumer.partitionsFor(topic);
-				if (partitions != null && partitions.size() > 0) {
-					return;
-				}
-			}
-			while (System.currentTimeMillis() < deadline);
-			fail("Test topic could not be created");
-		}
-	}
-	
-	protected static void deleteTestTopic(String topic) {
-		LOG.info("Deleting topic {}", topic);
-
-		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
-
-		AdminUtils.deleteTopic(zk, topic);
-		
-		zk.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
deleted file mode 100644
index c412136..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import kafka.admin.AdminUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.streaming.connectors.KafkaTestBase;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
-	
-	@Test
-	public void runOffsetManipulationinZooKeeperTest() {
-		try {
-			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
-			final String groupId = "ZookeeperOffsetHandlerTest-Group";
-			
-			final long offset = (long) (Math.random() * Long.MAX_VALUE);
-
-			ZkClient zkClient = createZookeeperClient();
-			AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
-				
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
-	
-			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
-
-			zkClient.close();
-			
-			assertEquals(offset, fetchedOffset);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
deleted file mode 100644
index 7befe14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.KafkaSink;
-import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-	
-	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
-														String brokerConnection, String topic,
-														int numPartitions,
-														final int from, final int to) throws Exception {
-
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setNumberOfExecutionRetries(0);
-		
-		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-						int cnt = from;
-						int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-						while (running && cnt <= to) {
-							ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
-							cnt++;
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
-				new Tuple2Partitioner(numPartitions)
-		));
-
-		env.execute("Data generator (Int, Int) stream to topic " + topic);
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
-															String brokerConnection, String topic,
-															final int numPartitions,
-															final int numElements,
-															final boolean randomizeOrder) throws Exception {
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setNumberOfExecutionRetries(0);
-
-		DataStream<Integer> stream = env.addSource(
-				new RichParallelSourceFunction<Integer>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Integer> ctx) {
-						// create a sequence
-						int[] elements = new int[numElements];
-						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
-								i < numElements;
-								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-							
-							elements[i] = val;
-						}
-
-						// scramble the sequence
-						if (randomizeOrder) {
-							Random rnd = new Random();
-							for (int i = 0; i < elements.length; i++) {
-								int otherPos = rnd.nextInt(elements.length);
-								
-								int tmp = elements[i];
-								elements[i] = elements[otherPos];
-								elements[otherPos] = tmp;
-							}
-						}
-
-						// emit the sequence
-						int pos = 0;
-						while (running && pos < elements.length) {
-							ctx.collect(elements[pos++]);
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		stream
-				.rebalance()
-				.addSink(new KafkaSink<>(brokerConnection, topic,
-						new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
-						new SerializableKafkaPartitioner() {
-							@Override
-							public int partition(Object key, int numPartitions) {
-								return ((Integer) key) % numPartitions;
-							}
-						}));
-
-		env.execute("Scrambles int sequence generator");
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static class InfiniteStringsGenerator extends Thread {
-
-		private final String kafkaConnectionString;
-		
-		private final String topic;
-		
-		private volatile Throwable error;
-		
-		private volatile boolean running = true;
-
-		
-		public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
-			this.kafkaConnectionString = kafkaConnectionString;
-			this.topic = topic;
-		}
-
-		@Override
-		public void run() {
-			// we manually feed data into the Kafka sink
-			KafkaSink<String> producer = null;
-			try {
-				producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
-				producer.open(new Configuration());
-				
-				final StringBuilder bld = new StringBuilder();
-				final Random rnd = new Random();
-				
-				while (running) {
-					bld.setLength(0);
-					
-					int len = rnd.nextInt(100) + 1;
-					for (int i = 0; i < len; i++) {
-						bld.append((char) (rnd.nextInt(20) + 'a') );
-					}
-					
-					String next = bld.toString();
-					producer.invoke(next);
-				}
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-			finally {
-				if (producer != null) {
-					try {
-						producer.close();
-					}
-					catch (Throwable t) {
-						// ignore
-					}
-				}
-			}
-		}
-		
-		public void shutdown() {
-			this.running = false;
-			this.interrupt();
-		}
-		
-		public Throwable getError() {
-			return this.error;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
deleted file mode 100644
index b89bd5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * Sink function that discards data.
- * @param <T> The type of the function.
- */
-public class DiscardingSink<T> implements SinkFunction<T> {
-
-	private static final long serialVersionUID = 2777597566520109843L;
-
-	@Override
-	public void invoke(T value) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 7796af9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
-		Checkpointed<Integer>, CheckpointNotifier, Runnable {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-	
-	private static final long serialVersionUID = 6334389850158707313L;
-	
-	public static volatile boolean failedBefore;
-	public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
-	private final int failCount;
-	private int numElementsTotal;
-	private int numElementsThisTime;
-	
-	private boolean failer;
-	private boolean hasBeenCheckpointed;
-	
-	private Thread printer;
-	private volatile boolean printerRunning = true;
-
-	public FailingIdentityMapper(int failCount) {
-		this.failCount = failCount;
-	}
-
-	@Override
-	public void open(Configuration parameters) {
-		failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		printer = new Thread(this, "FailingIdentityMapper Status Printer");
-		printer.start();
-	}
-
-	@Override
-	public T map(T value) throws Exception {
-		numElementsTotal++;
-		numElementsThisTime++;
-		
-		if (!failedBefore) {
-			Thread.sleep(10);
-			
-			if (failer && numElementsTotal >= failCount) {
-				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-				failedBefore = true;
-				throw new Exception("Artificial Test Failure");
-			}
-		}
-		return value;
-	}
-
-	@Override
-	public void close() throws Exception {
-		printerRunning = false;
-		if (printer != null) {
-			printer.interrupt();
-			printer = null;
-		}
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) {
-		this.hasBeenCheckpointed = true;
-	}
-
-	@Override
-	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-		return numElementsTotal;
-	}
-
-	@Override
-	public void restoreState(Integer state) {
-		numElementsTotal = state;
-	}
-
-	@Override
-	public void run() {
-		while (printerRunning) {
-			try {
-				Thread.sleep(5000);
-			}
-			catch (InterruptedException e) {
-				// ignore
-			}
-			LOG.info("============================> Failing mapper  {}: count={}, totalCount={}",
-					getRuntimeContext().getIndexOfThisSubtask(),
-					numElementsThisTime, numElementsTotal);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index a7fa2ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-	
-	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-	
-	
-	public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
-		
-		// find the jobID
-		Future<Object> listResponse = jobManager.ask(
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout);
-
-		List<JobStatusMessage> jobs;
-		try {
-			Object result = Await.result(listResponse, askTimeout);
-			jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-		}
-		catch (Exception e) {
-			throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-		}
-		
-		if (jobs.isEmpty()) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		if (jobs.size() != 1) {
-			throw new Exception("Could not cancel job - more than one running job.");
-		}
-		
-		JobStatusMessage status = jobs.get(0);
-		if (status.getJobState().isTerminalState()) {
-			throw new Exception("Could not cancel job - job is not running any more");
-		}
-		
-		JobID jobId = status.getJobId();
-		
-		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
-		try {
-			Await.result(response, askTimeout);
-		}
-		catch (Exception e) {
-			throw new Exception("Sending the 'cancel' message failed.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
deleted file mode 100644
index 1f71271..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class MockRuntimeContext implements RuntimeContext {
-
-	private final int numberOfParallelSubtasks;
-	private final int indexOfThisSubtask;
-
-	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
-		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
-		this.indexOfThisSubtask = indexOfThisSubtask;
-	}
-
-
-	@Override
-	public String getTaskName() {
-		return null;
-	}
-
-	@Override
-	public int getNumberOfParallelSubtasks() {
-		return numberOfParallelSubtasks;
-	}
-
-	@Override
-	public int getIndexOfThisSubtask() {
-		return indexOfThisSubtask;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return null;
-	}
-
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return null;
-	}
-
-	@Override
-	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
-
-	@Override
-	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
-		return null;
-	}
-
-	@Override
-	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-		return null;
-	}
-
-	@Override
-	public IntCounter getIntCounter(String name) {
-		return null;
-	}
-
-	@Override
-	public LongCounter getLongCounter(String name) {
-		return null;
-	}
-
-	@Override
-	public DoubleCounter getDoubleCounter(String name) {
-		return null;
-	}
-
-	@Override
-	public Histogram getHistogram(String name) {
-		return null;
-	}
-
-	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
-		return null;
-	}
-
-	@Override
-	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		return null;
-	}
-
-	@Override
-	public DistributedCache getDistributedCache() {
-		return null;
-	}
-
-	@Override
-	public <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
-		return null;
-	}
-
-	@Override
-	public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned) throws IOException {
-		return null;
-	}
-}