You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:55 UTC

[05/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
deleted file mode 100644
index aa7ea49..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ /dev/null
@@ -1,2006 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
-import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.testutils.junit.RetryOnException;
-import org.apache.flink.testutils.junit.RetryRule;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-	
-	@Rule
-	public RetryRule retryRule = new RetryRule();
-
-
-	// ------------------------------------------------------------------------
-	//  Common Test Preparation
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Makes sure that no job is on the JobManager any more from any previous tests that use
-	 * the same mini cluster. Otherwise, missing slots may happen.
-	 */
-	@Before
-	public void ensureNoJobIsLingering() throws Exception {
-		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-	}
-	
-	
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	//
-	//  The tests here are all not activated (by an @Test tag), but need
-	//  to be invoked from the extending classes. That way, the classes can
-	//  select which tests to run.
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
-	 * and a wrong broker was specified
-	 *
-	 * @throws Exception
-	 */
-	public void runFailOnNoBrokerTest() throws Exception {
-		try {
-			Properties properties = new Properties();
-
-			StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			see.getConfig().disableSysoutLogging();
-			see.setRestartStrategy(RestartStrategies.noRestart());
-			see.setParallelism(1);
-
-			// use wrong ports for the consumers
-			properties.setProperty("bootstrap.servers", "localhost:80");
-			properties.setProperty("zookeeper.connect", "localhost:80");
-			properties.setProperty("group.id", "test");
-			properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast
-			properties.setProperty("socket.timeout.ms", "3000");
-			properties.setProperty("session.timeout.ms", "2000");
-			properties.setProperty("fetch.max.wait.ms", "2000");
-			properties.setProperty("heartbeat.interval.ms", "1000");
-			properties.putAll(secureProps);
-			FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("doesntexist", new SimpleStringSchema(), properties);
-			DataStream<String> stream = see.addSource(source);
-			stream.print();
-			see.execute("No broker test");
-		} catch(ProgramInvocationException pie) {
-			if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
-				assertTrue(pie.getCause() instanceof JobExecutionException);
-
-				JobExecutionException jee = (JobExecutionException) pie.getCause();
-
-				assertTrue(jee.getCause() instanceof TimeoutException);
-
-				TimeoutException te = (TimeoutException) jee.getCause();
-
-				assertEquals("Timeout expired while fetching topic metadata", te.getMessage());
-			} else {
-				assertTrue(pie.getCause() instanceof JobExecutionException);
-
-				JobExecutionException jee = (JobExecutionException) pie.getCause();
-
-				assertTrue(jee.getCause() instanceof RuntimeException);
-
-				RuntimeException re = (RuntimeException) jee.getCause();
-
-				assertTrue(re.getMessage().contains("Unable to retrieve any partitions for the requested topics [doesntexist]"));
-			}
-		}
-	}
-
-	/**
-	 * Ensures that the committed offsets to Kafka are the offsets of "the next record to process"
-	 */
-	public void runCommitOffsetsToKafka() throws Exception {
-		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
-		final int parallelism = 3;
-		final int recordsInEachPartition = 50;
-
-		final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition, parallelism, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env.setParallelism(parallelism);
-		env.enableCheckpointing(200);
-
-		DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps));
-		stream.addSink(new DiscardingSink<String>());
-
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-		final Thread runner = new Thread("runner") {
-			@Override
-			public void run() {
-				try {
-					env.execute();
-				}
-				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
-						errorRef.set(t);
-					}
-				}
-			}
-		};
-		runner.start();
-
-		final Long l50 = 50L; // the final committed offset in Kafka should be 50
-		final long deadline = 30000 + System.currentTimeMillis();
-
-		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
-
-		do {
-			Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-			Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-			Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-
-			if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
-				break;
-			}
-
-			Thread.sleep(100);
-		}
-		while (System.currentTimeMillis() < deadline);
-
-		// cancel the job
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-		final Throwable t = errorRef.get();
-		if (t != null) {
-			throw new RuntimeException("Job failed with an exception", t);
-		}
-
-		// final check to see if offsets are correctly in Kafka
-		Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-		Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-		Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-		Assert.assertEquals(Long.valueOf(50L), o1);
-		Assert.assertEquals(Long.valueOf(50L), o2);
-		Assert.assertEquals(Long.valueOf(50L), o3);
-
-		kafkaOffsetHandler.close();
-		deleteTestTopic(topicName);
-	}
-
-	/**
-	 * This test first writes a total of 300 records to a test topic, reads the first 150 so that some offsets are
-	 * committed to Kafka, and then startup the consumer again to read the remaining records starting from the committed offsets.
-	 * The test ensures that whatever offsets were committed to Kafka, the consumer correctly picks them up
-	 * and starts at the correct position.
-	 */
-	public void runStartFromKafkaCommitOffsets() throws Exception {
-		final int parallelism = 3;
-		final int recordsInEachPartition = 300;
-
-		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);
-
-		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
-
-		Long o1;
-		Long o2;
-		Long o3;
-		int attempt = 0;
-		// make sure that o1, o2, o3 are not all null before proceeding
-		do {
-			attempt++;
-			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
-
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.getConfig().disableSysoutLogging();
-			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-			env.setParallelism(parallelism);
-			env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
-
-			env
-				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
-				.map(new ThrottledMapper<String>(50))
-				.map(new MapFunction<String, Object>() {
-					int count = 0;
-					@Override
-					public Object map(String value) throws Exception {
-						count++;
-						if (count == 150) {
-							throw new SuccessException();
-						}
-						return null;
-					}
-				})
-				.addSink(new DiscardingSink<>());
-
-			tryExecute(env, "Read some records to commit offsets to Kafka");
-
-			o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-			o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-			o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-		} while (o1 == null && o2 == null && o3 == null && attempt < 3);
-
-		if (o1 == null && o2 == null && o3 == null) {
-			throw new RuntimeException("No offsets have been committed after 3 attempts");
-		}
-
-		LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3);
-
-		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env2.getConfig().disableSysoutLogging();
-		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env2.setParallelism(parallelism);
-
-		// whatever offsets were committed for each partition, the consumer should pick
-		// them up and start from the correct position so that the remaining records are all read
-		HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
-		partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>(
-			(o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition,
-			(o1 != null) ? o1.intValue() : 0
-		));
-		partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>(
-			(o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition,
-			(o2 != null) ? o2.intValue() : 0
-		));
-		partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>(
-			(o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition,
-			(o3 != null) ? o3.intValue() : 0
-		));
-
-		readSequence(env2, standardProps, topicName, partitionsToValuesCountAndStartOffset);
-
-		kafkaOffsetHandler.close();
-		deleteTestTopic(topicName);
-	}
-
-	/**
-	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
-	 * is committed to Kafka, even if some partitions are not read.
-	 *
-	 * Test:
-	 * - Create 3 partitions
-	 * - write 50 messages into each.
-	 * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka.
-	 * - Check if the offsets in Kafka are set to 50 for the three partitions
-	 *
-	 * See FLINK-3440 as well
-	 */
-	public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
-		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
-		final int parallelism = 3;
-		final int recordsInEachPartition = 50;
-
-		final String topicName = writeSequence("testAutoOffsetRetrievalAndCommitToKafkaTopic", recordsInEachPartition, parallelism, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env.setParallelism(parallelism);
-		env.enableCheckpointing(200);
-
-		Properties readProps = new Properties();
-		readProps.putAll(standardProps);
-		readProps.setProperty("auto.offset.reset", "latest"); // set to reset to latest, so that partitions are initially not read
-
-		DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), readProps));
-		stream.addSink(new DiscardingSink<String>());
-
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-		final Thread runner = new Thread("runner") {
-			@Override
-			public void run() {
-				try {
-					env.execute();
-				}
-				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
-						errorRef.set(t);
-					}
-				}
-			}
-		};
-		runner.start();
-
-		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
-
-		final Long l50 = 50L; // the final committed offset in Kafka should be 50
-		final long deadline = 30000 + System.currentTimeMillis();
-		do {
-			Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-			Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-			Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-
-			if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
-				break;
-			}
-
-			Thread.sleep(100);
-		}
-		while (System.currentTimeMillis() < deadline);
-
-		// cancel the job
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-		final Throwable t = errorRef.get();
-		if (t != null) {
-			throw new RuntimeException("Job failed with an exception", t);
-		}
-
-		// final check to see if offsets are correctly in Kafka
-		Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-		Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-		Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-		Assert.assertEquals(Long.valueOf(50L), o1);
-		Assert.assertEquals(Long.valueOf(50L), o2);
-		Assert.assertEquals(Long.valueOf(50L), o3);
-
-		kafkaOffsetHandler.close();
-		deleteTestTopic(topicName);
-	}
-	
-	/**
-	 * Ensure Kafka is working on both producer and consumer side.
-	 * This executes a job that contains two Flink pipelines.
-	 *
-	 * <pre>
-	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
-	 * </pre>
-	 * 
-	 * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
-	 * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
-	 * cause the test to fail.
-	 *
-	 * This test also ensures that FLINK-3156 doesn't happen again:
-	 *
-	 * The following situation caused a NPE in the FlinkKafkaConsumer
-	 *
-	 * topic-1 <-- elements are only produced into topic1.
-	 * topic-2
-	 *
-	 * Therefore, this test is consuming as well from an empty topic.
-	 *
-	 */
-	@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
-	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
-		final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
-		final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
-
-		final int parallelism = 3;
-		final int elementsPerPartition = 100;
-		final int totalElements = parallelism * elementsPerPartition;
-
-		createTestTopic(topic, parallelism, 2);
-		createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time
-
-		final StreamExecutionEnvironment env =
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(parallelism);
-		env.enableCheckpointing(500);
-		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
-		env.getConfig().disableSysoutLogging();
-
-		TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
-
-		TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
-				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-		TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
-				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
-
-		// ----------- add producer dataflow ----------
-
-		DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
-
-			private boolean running = true;
-
-			@Override
-			public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException {
-				int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
-				int limit = cnt + elementsPerPartition;
-
-
-				while (running && cnt < limit) {
-					ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
-					cnt++;
-					// we delay data generation a bit so that we are sure that some checkpoints are
-					// triggered (for FLINK-3156)
-					Thread.sleep(50);
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		});
-		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-		producerProperties.setProperty("retries", "3");
-		producerProperties.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
-
-		// ----------- add consumer dataflow ----------
-
-		List<String> topics = new ArrayList<>();
-		topics.add(topic);
-		topics.add(additionalEmptyTopic);
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topics, sourceSchema, props);
-
-		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
-
-		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-
-			private int elCnt = 0;
-			private BitSet validator = new BitSet(totalElements);
-
-			@Override
-			public void invoke(Tuple2<Long, String> value) throws Exception {
-				String[] sp = value.f1.split("-");
-				int v = Integer.parseInt(sp[1]);
-
-				assertEquals(value.f0 - 1000, (long) v);
-
-				assertFalse("Received tuple twice", validator.get(v));
-				validator.set(v);
-				elCnt++;
-
-				if (elCnt == totalElements) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != totalElements) {
-						fail("The bitset was not set to 1 on all elements. Next clear:"
-								+ nc + " Set: " + validator);
-					}
-					throw new SuccessException();
-				}
-			}
-
-			@Override
-			public void close() throws Exception {
-				super.close();
-			}
-		}).setParallelism(1);
-
-		try {
-			tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
-		}
-		catch (ProgramInvocationException | JobExecutionException e) {
-			// look for NotLeaderForPartitionException
-			Throwable cause = e.getCause();
-
-			// search for nested SuccessExceptions
-			int depth = 0;
-			while (cause != null && depth++ < 20) {
-				if (cause instanceof kafka.common.NotLeaderForPartitionException) {
-					throw (Exception) cause;
-				}
-				cause = cause.getCause();
-			}
-			throw e;
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
-	 * Flink sources.
-	 */
-	public void runOneToOneExactlyOnceTest() throws Exception {
-
-		final String topic = "oneToOneTopic";
-		final int parallelism = 5;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = parallelism * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-		createTestTopic(topic, parallelism, 1);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				kafkaServer,
-				topic, parallelism, numElementsPerPartition, true);
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.enableCheckpointing(500);
-		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-				.addSource(kafkaSource)
-				.map(new PartitionValidatingMapper(parallelism, 1))
-				.map(new FailingIdentityMapper<Integer>(failAfterElements))
-				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		FailingIdentityMapper.failedBefore = false;
-		tryExecute(env, "One-to-one exactly once test");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
-	 * one Flink source will read multiple Kafka partitions.
-	 */
-	public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
-		final String topic = "oneToManyTopic";
-		final int numPartitions = 5;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = numPartitions * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-		final int parallelism = 2;
-
-		createTestTopic(topic, numPartitions, 1);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				kafkaServer,
-				topic, numPartitions, numElementsPerPartition, false);
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.enableCheckpointing(500);
-		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-				.addSource(kafkaSource)
-				.map(new PartitionValidatingMapper(numPartitions, 3))
-				.map(new FailingIdentityMapper<Integer>(failAfterElements))
-				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		FailingIdentityMapper.failedBefore = false;
-		tryExecute(env, "One-source-multi-partitions exactly once test");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
-	 * that some Flink sources will read no partitions.
-	 */
-	public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
-		final String topic = "manyToOneTopic";
-		final int numPartitions = 5;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = numPartitions * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-		final int parallelism = 8;
-
-		createTestTopic(topic, numPartitions, 1);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				kafkaServer,
-				topic, numPartitions, numElementsPerPartition, true);
-
-		// run the topology that fails and recovers
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.enableCheckpointing(500);
-		env.setParallelism(parallelism);
-		// set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions.
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
-		env.getConfig().disableSysoutLogging();
-		env.setBufferTimeout(0);
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-			.addSource(kafkaSource)
-			.map(new PartitionValidatingMapper(numPartitions, 1))
-			.map(new FailingIdentityMapper<Integer>(failAfterElements))
-			.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		FailingIdentityMapper.failedBefore = false;
-		tryExecute(env, "multi-source-one-partitions exactly once test");
-
-
-		deleteTestTopic(topic);
-	}
-	
-	
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runCancelingOnFullInputTest() throws Exception {
-		final String topic = "cancelingOnFullTopic";
-
-		final int parallelism = 3;
-		createTestTopic(topic, parallelism, 1);
-
-		// launch a producer thread
-		DataGenerators.InfiniteStringsGenerator generator =
-				new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
-		generator.start();
-
-		// launch a consumer asynchronously
-
-		final AtomicReference<Throwable> jobError = new AtomicReference<>();
-
-		final Runnable jobRunner = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("Runner for CancelingOnFullInputTest");
-				}
-				catch (Throwable t) {
-					jobError.set(t);
-				}
-			}
-		};
-
-		Thread runnerThread = new Thread(jobRunner, "program runner thread");
-		runnerThread.start();
-
-		// wait a bit before canceling
-		Thread.sleep(2000);
-
-		Throwable failueCause = jobError.get();
-		if(failueCause != null) {
-			failueCause.printStackTrace();
-			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
-		}
-
-		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest");
-
-		// wait for the program to be done and validate that we failed with the right exception
-		runnerThread.join();
-
-		failueCause = jobError.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-		if (generator.isAlive()) {
-			generator.shutdown();
-			generator.join();
-		}
-		else {
-			Throwable t = generator.getError();
-			if (t != null) {
-				t.printStackTrace();
-				fail("Generator failed: " + t.getMessage());
-			} else {
-				fail("Generator failed with no exception");
-			}
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading empty partitions. 
-	 */
-	public void runCancelingOnEmptyInputTest() throws Exception {
-		final String topic = "cancelingOnEmptyInputTopic";
-
-		final int parallelism = 3;
-		createTestTopic(topic, parallelism, 1);
-
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-
-		final Runnable jobRunner = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-					env.setParallelism(parallelism);
-					env.enableCheckpointing(100);
-					env.getConfig().disableSysoutLogging();
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-					FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
-
-					env.addSource(source).addSink(new DiscardingSink<String>());
-
-					env.execute("CancelingOnEmptyInputTest");
-				}
-				catch (Throwable t) {
-					LOG.error("Job Runner failed with exception", t);
-					error.set(t);
-				}
-			}
-		};
-
-		Thread runnerThread = new Thread(jobRunner, "program runner thread");
-		runnerThread.start();
-
-		// wait a bit before canceling
-		Thread.sleep(2000);
-
-		Throwable failueCause = error.get();
-		if (failueCause != null) {
-			failueCause.printStackTrace();
-			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
-		}
-		// cancel
-		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-
-		// wait for the program to be done and validate that we failed with the right exception
-		runnerThread.join();
-
-		failueCause = error.get();
-		assertNotNull("program did not fail properly due to canceling", failueCause);
-		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
-	 */
-	public void runFailOnDeployTest() throws Exception {
-		final String topic = "failOnDeployTopic";
-
-		createTestTopic(topic, 2, 1);
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(12); // needs to be more that the mini cluster has slots
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-				.addSource(kafkaSource)
-				.addSink(new DiscardingSink<Integer>());
-
-		try {
-			env.execute("test fail on deploy");
-			fail("this test should fail with an exception");
-		}
-		catch (ProgramInvocationException e) {
-
-			// validate that we failed due to a NoResourceAvailableException
-			Throwable cause = e.getCause();
-			int depth = 0;
-			boolean foundResourceException = false;
-
-			while (cause != null && depth++ < 20) {
-				if (cause instanceof NoResourceAvailableException) {
-					foundResourceException = true;
-					break;
-				}
-				cause = cause.getCause();
-			}
-
-			assertTrue("Wrong exception", foundResourceException);
-		}
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Test producing and consuming into multiple topics
-	 * @throws java.lang.Exception
-	 */
-	public void runProduceConsumeMultipleTopics() throws java.lang.Exception {
-		final int NUM_TOPICS = 5;
-		final int NUM_ELEMENTS = 20;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.getConfig().disableSysoutLogging();
-		
-		// create topics with content
-		final List<String> topics = new ArrayList<>();
-		for (int i = 0; i < NUM_TOPICS; i++) {
-			final String topic = "topic-" + i;
-			topics.add(topic);
-			// create topic
-			createTestTopic(topic, i + 1 /*partitions*/, 1);
-		}
-		// run first job, producing into all topics
-		DataStream<Tuple3<Integer, Integer, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() {
-
-			@Override
-			public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception {
-				int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-				for (int topicId = 0; topicId < NUM_TOPICS; topicId++) {
-					for (int i = 0; i < NUM_ELEMENTS; i++) {
-						ctx.collect(new Tuple3<>(partition, i, "topic-" + topicId));
-					}
-				}
-			}
-
-			@Override
-			public void cancel() {
-			}
-		});
-
-		Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig());
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, "dummy", schema, props, null);
-
-		env.execute("Write to topics");
-
-		// run second job consuming from multiple topics
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.getConfig().disableSysoutLogging();
-
-		stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
-
-		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
-			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
-			@Override
-			public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
-				Integer count = countPerTopic.get(value.f2);
-				if (count == null) {
-					count = 1;
-				} else {
-					count++;
-				}
-				countPerTopic.put(value.f2, count);
-
-				// check map:
-				for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
-					if (el.getValue() < NUM_ELEMENTS) {
-						break; // not enough yet
-					}
-					if (el.getValue() > NUM_ELEMENTS) {
-						throw new RuntimeException("There is a failure in the test. I've read " +
-								el.getValue() + " from topic " + el.getKey());
-					}
-				}
-				// we've seen messages from all topics
-				throw new SuccessException();
-			}
-		}).setParallelism(1);
-
-		tryExecute(env, "Count elements from the topics");
-
-
-		// delete all topics again
-		for (int i = 0; i < NUM_TOPICS; i++) {
-			final String topic = "topic-" + i;
-			deleteTestTopic(topic);
-		}
-	}
-
-	/**
-	 * Serialization scheme forwarding byte[] records.
-	 */
-	private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {
-
-		@Override
-		public byte[] serializeKey(byte[] element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(byte[] element) {
-			return element;
-		}
-
-		@Override
-		public String getTargetTopic(byte[] element) {
-			return null;
-		}
-	}
-
-	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-		
-		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-			return new Tuple3<>(t2.f0, t2.f1, topic);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
-			return false;
-		}
-
-		@Override
-		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
-		}
-
-		@Override
-		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
-			ByteArrayOutputStream by = new ByteArrayOutputStream();
-			DataOutputView out = new DataOutputViewStreamWrapper(by);
-			try {
-				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
-			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
-			}
-			return by.toByteArray();
-		}
-
-		@Override
-		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
-			return element.f2;
-		}
-	}
-
-	/**
-	 * Test Flink's Kafka integration also with very big records (30MB)
-	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
-	 *
-	 */
-	public void runBigRecordTestTopology() throws Exception {
-
-		final String topic = "bigRecordTestTopic";
-		final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-
-		createTestTopic(topic, parallelism, 1);
-
-		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
-
-		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
-				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-		env.enableCheckpointing(100);
-		env.setParallelism(parallelism);
-
-		// add consuming topology:
-		Properties consumerProps = new Properties();
-		consumerProps.putAll(standardProps);
-		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 14));
-		consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 14)); // for the new fetcher
-		consumerProps.setProperty("queued.max.message.chunks", "1");
-		consumerProps.putAll(secureProps);
-
-		FlinkKafkaConsumerBase<Tuple2<Long, byte[]>> source = kafkaServer.getConsumer(topic, serSchema, consumerProps);
-		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
-
-		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-
-			private int elCnt = 0;
-
-			@Override
-			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
-				elCnt++;
-				if (value.f0 == -1) {
-					// we should have seen 11 elements now.
-					if (elCnt == 11) {
-						throw new SuccessException();
-					} else {
-						throw new RuntimeException("There have been "+elCnt+" elements");
-					}
-				}
-				if (elCnt > 10) {
-					throw new RuntimeException("More than 10 elements seen: "+elCnt);
-				}
-			}
-		});
-
-		// add producing topology
-		Properties producerProps = new Properties();
-		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 15));
-		producerProps.setProperty("retries", "3");
-		producerProps.putAll(secureProps);
-		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
-
-		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-
-			private boolean running;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				running = true;
-			}
-
-			@Override
-			public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
-				Random rnd = new Random();
-				long cnt = 0;
-				int sevenMb = 1024 * 1024 * 7;
-
-				while (running) {
-					byte[] wl = new byte[sevenMb + rnd.nextInt(sevenMb)];
-					ctx.collect(new Tuple2<>(cnt++, wl));
-
-					Thread.sleep(100);
-
-					if (cnt == 10) {
-						// signal end
-						ctx.collect(new Tuple2<>(-1L, new byte[]{1}));
-						break;
-					}
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		});
-
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null);
-
-		tryExecute(env, "big topology test");
-		deleteTestTopic(topic);
-	}
-
-	
-	public void runBrokerFailureTest() throws Exception {
-		final String topic = "brokerFailureTestTopic";
-
-		final int parallelism = 2;
-		final int numElementsPerPartition = 1000;
-		final int totalElements = parallelism * numElementsPerPartition;
-		final int failAfterElements = numElementsPerPartition / 3;
-
-
-		createTestTopic(topic, parallelism, 2);
-
-		DataGenerators.generateRandomizedIntegerSequence(
-				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-				kafkaServer,
-				topic, parallelism, numElementsPerPartition, true);
-
-		// find leader to shut down
-		int leaderId = kafkaServer.getLeaderToShutDown(topic);
-
-		LOG.info("Leader to shutdown {}", leaderId);
-
-
-		// run the topology (the consumers must handle the failures)
-
-		DeserializationSchema<Integer> schema =
-				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(parallelism);
-		env.enableCheckpointing(500);
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, props);
-
-		env
-				.addSource(kafkaSource)
-				.map(new PartitionValidatingMapper(parallelism, 1))
-				.map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
-				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
-		BrokerKillingMapper.killedLeaderBefore = false;
-		tryExecute(env, "Broker failure once test");
-
-		// start a new broker:
-		kafkaServer.restartBroker(leaderId);
-	}
-
-	public void runKeyValueTest() throws Exception {
-		final String topic = "keyvaluetest";
-		createTestTopic(topic, 1, 1);
-		final int ELEMENT_COUNT = 5000;
-
-		// ----------- Write some data into Kafka -------------------
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(1);
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() {
-			@Override
-			public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
-				Random rnd = new Random(1337);
-				for (long i = 0; i < ELEMENT_COUNT; i++) {
-					PojoValue pojo = new PojoValue();
-					pojo.when = new Date(rnd.nextLong());
-					pojo.lon = rnd.nextLong();
-					pojo.lat = i;
-					// make every second key null to ensure proper "null" serialization
-					Long key = (i % 2 == 0) ? null : i;
-					ctx.collect(new Tuple2<>(key, pojo));
-				}
-			}
-			@Override
-			public void cancel() {
-			}
-		});
-
-		KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
-		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-		producerProperties.setProperty("retries", "3");
-		kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null);
-		env.execute("Write KV to Kafka");
-
-		// ----------- Read the data again -------------------
-
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(1);
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-
-
-		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
-		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
-			long counter = 0;
-			@Override
-			public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
-				// the elements should be in order.
-				Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
-				if (value.f1.lat % 2 == 0) {
-					assertNull("key was not null", value.f0);
-				} else {
-					Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
-				}
-				counter++;
-				if (counter == ELEMENT_COUNT) {
-					// we got the right number of elements
-					throw new SuccessException();
-				}
-			}
-		});
-
-		tryExecute(env, "Read KV from Kafka");
-
-		deleteTestTopic(topic);
-	}
-
-	public static class PojoValue {
-		public Date when;
-		public long lon;
-		public long lat;
-		public PojoValue() {}
-	}
-
-
-	/**
-	 * Test delete behavior and metrics for producer
-	 * @throws Exception
-	 */
-	public void runAllDeletesTest() throws Exception {
-		final String topic = "alldeletestest";
-		createTestTopic(topic, 1, 1);
-		final int ELEMENT_COUNT = 300;
-
-		// ----------- Write some data into Kafka -------------------
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(1);
-		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Tuple2<byte[], PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<byte[], PojoValue>>() {
-			@Override
-			public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception {
-				Random rnd = new Random(1337);
-				for (long i = 0; i < ELEMENT_COUNT; i++) {
-					final byte[] key = new byte[200];
-					rnd.nextBytes(key);
-					ctx.collect(new Tuple2<>(key, (PojoValue) null));
-				}
-			}
-			@Override
-			public void cancel() {
-			}
-		});
-
-		TypeInformationKeyValueSerializationSchema<byte[], PojoValue> schema = new TypeInformationKeyValueSerializationSchema<>(byte[].class, PojoValue.class, env.getConfig());
-
-		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-		producerProperties.setProperty("retries", "3");
-		producerProperties.putAll(secureProps);
-		kafkaServer.produceIntoKafka(kvStream, topic, schema, producerProperties, null);
-
-		env.execute("Write deletes to Kafka");
-
-		// ----------- Read the data again -------------------
-
-		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setParallelism(1);
-		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		DataStream<Tuple2<byte[], PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, schema, props));
-
-		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<byte[], PojoValue>, Object>() {
-			long counter = 0;
-			@Override
-			public void flatMap(Tuple2<byte[], PojoValue> value, Collector<Object> out) throws Exception {
-				// ensure that deleted messages are passed as nulls
-				assertNull(value.f1);
-				counter++;
-				if (counter == ELEMENT_COUNT) {
-					// we got the right number of elements
-					throw new SuccessException();
-				}
-			}
-		});
-
-		tryExecute(env, "Read deletes from Kafka");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Test that ensures that DeserializationSchema.isEndOfStream() is properly evaluated.
-	 *
-	 * @throws Exception
-	 */
-	public void runEndOfStreamTest() throws Exception {
-
-		final int ELEMENT_COUNT = 300;
-		final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1);
-
-		// read using custom schema
-		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env1.setParallelism(1);
-		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env1.getConfig().disableSysoutLogging();
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-
-		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
-		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
-			@Override
-			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
-				// noop ;)
-			}
-		});
-
-		JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka");
-
-		deleteTestTopic(topic);
-	}
-
-	/**
-	 * Test metrics reporting for consumer
-	 *
-	 * @throws Exception
-	 */
-	public void runMetricsTest() throws Throwable {
-
-		// create a stream with 5 topics
-		final String topic = "metricsStream";
-		createTestTopic(topic, 5, 1);
-
-		final Tuple1<Throwable> error = new Tuple1<>(null);
-		Runnable job = new Runnable() {
-			@Override
-			public void run() {
-				try {
-					// start job writing & reading data.
-					final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-					env1.setParallelism(1);
-					env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-					env1.getConfig().disableSysoutLogging();
-					env1.disableOperatorChaining(); // let the source read everything into the network buffers
-
-					Properties props = new Properties();
-					props.putAll(standardProps);
-					props.putAll(secureProps);
-
-					TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
-					DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
-					fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
-						@Override
-						public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op
-						}
-					});
-
-					DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() {
-						boolean running = true;
-
-						@Override
-						public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-							int i = 0;
-							while (running) {
-								ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask()));
-								Thread.sleep(1);
-							}
-						}
-
-						@Override
-						public void cancel() {
-							running = false;
-						}
-					});
-
-					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
-
-					env1.execute("Metrics test job");
-				} catch(Throwable t) {
-					LOG.warn("Got exception during execution", t);
-					if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job
-						error.f0 = t;
-					}
-				}
-			}
-		};
-		Thread jobThread = new Thread(job);
-		jobThread.start();
-
-		try {
-			// connect to JMX
-			MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
-			// wait until we've found all 5 offset metrics
-			Set<ObjectName> offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
-			while (offsetMetrics.size() < 5) { // test will time out if metrics are not properly working
-				if (error.f0 != null) {
-					// fail test early
-					throw error.f0;
-				}
-				offsetMetrics = mBeanServer.queryNames(new ObjectName("*current-offsets*:*"), null);
-				Thread.sleep(50);
-			}
-			Assert.assertEquals(5, offsetMetrics.size());
-			// we can't rely on the consumer to have touched all the partitions already
-			// that's why we'll wait until all five partitions have a positive offset.
-			// The test will fail if we never meet the condition
-			while (true) {
-				int numPosOffsets = 0;
-				// check that offsets are correctly reported
-				for (ObjectName object : offsetMetrics) {
-					Object offset = mBeanServer.getAttribute(object, "Value");
-					if((long) offset >= 0) {
-						numPosOffsets++;
-					}
-				}
-				if (numPosOffsets == 5) {
-					break;
-				}
-				// wait for the consumer to consume on all partitions
-				Thread.sleep(50);
-			}
-
-			// check if producer metrics are also available.
-			Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
-			Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);
-
-
-			LOG.info("Found all JMX metrics. Cancelling job.");
-		} finally {
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-		}
-
-		while (jobThread.isAlive()) {
-			Thread.sleep(50);
-		}
-		if (error.f0 != null) {
-			throw error.f0;
-		}
-
-		deleteTestTopic(topic);
-	}
-
-
-	public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
-		
-		final int finalCount;
-		int count = 0;
-		
-		TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-		TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
-
-		public FixedNumberDeserializationSchema(int finalCount) {
-			this.finalCount = finalCount;
-		}
-
-		@Override
-		public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			return ser.deserialize(in);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple2<Integer, Integer> nextElement) {
-			return ++count >= finalCount;
-		}
-
-		@Override
-		public TypeInformation<Tuple2<Integer, Integer>> getProducedType() {
-			return ti;
-		}
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Reading writing test data sets
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Runs a job using the provided environment to read a sequence of records from a single Kafka topic.
-	 * The method allows to individually specify the expected starting offset and total read value count of each partition.
-	 * The job will be considered successful only if all partition read results match the start offset and value count criteria.
-	 */
-	protected void readSequence(StreamExecutionEnvironment env, Properties cc,
-								final String topicName,
-								final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception {
-		final int sourceParallelism = partitionsToValuesCountAndStartOffset.keySet().size();
-
-		int finalCountTmp = 0;
-		for (Map.Entry<Integer, Tuple2<Integer, Integer>> valuesCountAndStartOffset : partitionsToValuesCountAndStartOffset.entrySet()) {
-			finalCountTmp += valuesCountAndStartOffset.getValue().f0;
-		}
-		final int finalCount = finalCountTmp;
-
-		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
-			new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
-
-		// create the consumer
-		cc.putAll(secureProps);
-		FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deser, cc);
-
-		DataStream<Tuple2<Integer, Integer>> source = env
-			.addSource(consumer).setParallelism(sourceParallelism)
-			.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
-
-		// verify data
-		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-
-			private HashMap<Integer, BitSet> partitionsToValueCheck;
-			private int count = 0;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				partitionsToValueCheck = new HashMap<>();
-				for (Integer partition : partitionsToValuesCountAndStartOffset.keySet()) {
-					partitionsToValueCheck.put(partition, new BitSet());
-				}
-			}
-
-			@Override
-			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
-				int partition = value.f0;
-				int val = value.f1;
-
-				BitSet bitSet = partitionsToValueCheck.get(partition);
-				if (bitSet == null) {
-					throw new RuntimeException("Got a record from an unknown partition");
-				} else {
-					bitSet.set(val - partitionsToValuesCountAndStartOffset.get(partition).f1);
-				}
-
-				count++;
-
-				LOG.info("Received message {}, total {} messages", value, count);
-
-				// verify if we've seen everything
-				if (count == finalCount) {
-					for (Map.Entry<Integer, BitSet> partitionsToValueCheck : this.partitionsToValueCheck.entrySet()) {
-						BitSet check = partitionsToValueCheck.getValue();
-						int expectedValueCount = partitionsToValuesCountAndStartOffset.get(partitionsToValueCheck.getKey()).f0;
-
-						if (check.cardinality() != expectedValueCount) {
-							throw new RuntimeException("Expected cardinality to be " + expectedValueCount +
-								", but was " + check.cardinality());
-						} else if (check.nextClearBit(0) != expectedValueCount) {
-							throw new RuntimeException("Expected next clear bit to be " + expectedValueCount +
-								", but was " + check.cardinality());
-						}
-					}
-
-					// test has passed
-					throw new SuccessException();
-				}
-			}
-
-		}).setParallelism(1);
-
-		tryExecute(env, "Read data from Kafka");
-
-		LOG.info("Successfully read sequence for verification");
-	}
-
-	/**
-	 * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, Properties, String, Map)} to
-	 * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic.
-	 */
-	protected void readSequence(StreamExecutionEnvironment env, Properties cc,
-								final int sourceParallelism,
-								final String topicName,
-								final int valuesCount, final int startFrom) throws Exception {
-		HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset = new HashMap<>();
-		for (int i = 0; i < sourceParallelism; i++) {
-			partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom));
-		}
-		readSequence(env, cc, topicName, partitionsToValuesCountAndStartOffset);
-	}
-
-	protected String writeSequence(
-			String baseTopicName,
-			final int numElements,
-			final int parallelism,
-			final int replicationFactor) throws Exception
-	{
-		LOG.info("\n===================================\n" +
-				"== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" +
-				"===================================");
-
-		final TypeInformation<Tuple2<Integer, Integer>> resultType = 
-				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
-
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-				new KeyedSerializationSchemaWrapper<>(
-						new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
-
-		final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
-				new KeyedDeserializationSchemaWrapper<>(
-						new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
-		
-		final int maxNumAttempts = 10;
-
-		for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
-			
-			final String topicName = baseTopicName + '-' + attempt;
-			
-			LOG.info("Writing attempt #1");
-			
-			// -------- Write the Sequence --------
-
-			createTestTopic(topicName, parallelism, replicationFactor);
-
-			StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-			writeEnv.getConfig().disableSysoutLogging();
-			
-			DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-	
-				private boolean running = true;
-	
-				@Override
-				public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-					int cnt = 0;
-					int partition = getRuntimeContext().getIndexOfThisSubtask();
-	
-					while (running && cnt < numElements) {
-						ctx.collect(new Tuple2<>(partition, cnt));
-						cnt++;
-					}
-				}
-	
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			}).setParallelism(parallelism);
-	
-			// the producer must not produce duplicates
-			Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-			producerProperties.setProperty("retries", "0");
-			producerProperties.putAll(secureProps);
-			
-			kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism))
-					.setParallelism(parallelism);
-
-			try {
-				writeEnv.execute("Write sequence");
-			}
-			catch (Exception e) {
-				LOG.error("Write attempt failed, trying again", e);
-				deleteTestTopic(topicName);
-				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-				continue;
-			}
-			
-			LOG.info("Finished writing sequence");
-
-			// -------- Validate the Sequence --------
-			
-			// we need to validate the sequence, because kafka's producers are not exactly once
-			LOG.info("Validating sequence");
-
-			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-			
-			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-			readEnv.getConfig().disableSysoutLogging();
-			readEnv.setParallelism(parallelism);
-			
-			Properties readProps = (Properties) standardProps.clone();
-			readProps.setProperty("group.id", "flink-tests-validator");
-			readProps.putAll(secureProps);
-			FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
-
-			readEnv
-					.addSource(consumer)
-					.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
-						
-						private final int totalCount = parallelism * numElements;
-						private int count = 0;
-						
-						@Override
-						public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
-							if (++count == totalCount) {
-								throw new SuccessException();
-							} else {
-								return value;
-							}
-						}
-					}).setParallelism(1)
-					.addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
-			
-			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-			
-			Thread runner = new Thread() {
-				@Override
-				public void run() {
-					try {
-						tryExecute(readEnv, "sequence validation");
-					} catch (Throwable t) {
-						errorRef.set(t);
-					}
-				}
-			};
-			runner.start();
-			
-			final long deadline = System.currentTimeMillis() + 10000;
-			long delay;
-			while (runner.isAlive() && (delay = deadline - System.currentTimeMillis()) > 0) {
-				runner.join(delay);
-			}
-			
-			boolean success;
-			
-			if (runner.isAlive()) {
-				// did not finish in time, maybe the producer dropped one or more records and
-				// the validation did not reach the exit point
-				success = false;
-				JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
-			}
-			else {
-				Throwable error = errorRef.get();
-				if (error != null) {
-					success = false;
-					LOG.info("Attempt " + attempt + " failed with exception", error);
-				}
-				else {
-					success = true;
-				}
-			}
-
-			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-			
-			if (success) {
-				// everything is good!
-				return topicName;
-			}
-			else {
-				deleteTestTopic(topicName);
-				// fall through the loop
-			}
-		}
-		
-		throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
-	}
-
-	// ------------------------------------------------------------------------
-	//  Debugging utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Read topic to list, only using Kafka code.
-	 */
-	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
-		if (streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if (kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if (kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
-		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
-		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
-		int read = 0;
-		while(iteratorToRead.hasNext()) {
-			read++;
-			result.add(iteratorToRead.next());
-			if (read == stopAfter) {
-				LOG.info("Read "+read+" elements");
-				return result;
-			}
-		}
-		return result;
-	}
-
-	private static void printTopic(String topicName, ConsumerConfig config,
-								DeserializationSchema<?> deserializationSchema,
-								int stopAfter) throws IOException {
-
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-
-		for (MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) 
-			throws IOException
-	{
-		// write the sequence to log for debugging purposes
-		Properties newProps = new Properties(standardProps);
-		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
-		newProps.putAll(secureProps);
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
-
-	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-			implements Checkpointed<Integer>, CheckpointListener {
-
-		private static final long serialVersionUID = 6334389850158707313L;
-
-		public static volatile boolean killedLeaderBefore;
-		public static volatile boolean hasBeenCheckpointedBeforeFailure;
-		
-		private final int shutdownBrokerId;
-		private final int failCount;
-		private int numElementsTotal;
-
-		private boolean failer;
-		private boolean hasBeenCheckpointed;
-
-
-		public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
-			this.shutdownBrokerId = shutdownBrokerId;
-			this.failCount = failCount;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		}
-
-		@Override
-		public T map(T value) throws Exception {
-			numElementsTotal++;
-			
-			if (!killedLeaderBefore) {
-				Thread.sleep(10);
-				
-				if (failer && numElementsTotal >= failCount) {
-					// shut down a Kafka broker
-					KafkaServer toShutDown = null;
-					for (KafkaServer server : kafkaServer.getBrokers()) {
-
-						if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
-							toShutDown = server;
-							break;
-						}
-					}
-	
-					if (toShutDown == null) {
-						StringBuilder listOfBrokers = new StringBuilder();
-						for (KafkaServer server : kafkaServer.getBrokers()) {
-							listOfBrokers.append(kafkaServer.getBrokerId(server));
-							listOfBrokers.append(" ; ");
-						}
-						
-						throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
-								+ " ; available brokers: " + listOfBrokers.toString());
-					}
-					else {
-						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-						killedLeaderBefore = true;
-						toShutDown.shutdown();
-					}
-				}
-			}
-			return value;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			hasBeenCheckpointed = true;
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsTotal;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			this.numElementsTotal = state;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
deleted file mode 100644
index c925c8f..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.apache.flink.test.util.SuccessException;
-
-
-import java.io.Serializable;
-import java.util.Properties;
-
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public abstract class KafkaProducerTestBase extends KafkaTestBase {
-
-
-	/**
-	 * 
-	 * <pre>
-	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
-	 *            /                  |                                       \
-	 *           /                   |                                        \
-	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
-	 *           \                   |                                        /
-	 *            \                  |                                       /
-	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
-	 * </pre>
-	 * 
-	 * The mapper validates that the values come consistently from the correct Kafka partition.
-	 * 
-	 * The final sink validates that there are no duplicates and that all partitions are present.
-	 */
-	public void runCustomPartitioningTest() {
-		try {
-			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
-
-			final String topic = "customPartitioningTestTopic";
-			final int parallelism = 3;
-			
-			createTestTopic(topic, parallelism, 1);
-
-			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
-
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setRestartStrategy(RestartStrategies.noRestart());
-			env.getConfig().disableSysoutLogging();
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
-
-			// ------ producing topology ---------
-			
-			// source has DOP 1 to make sure it generates no duplicates
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-
-				private boolean running = true;
-
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
-					long cnt = 0;
-					while (running) {
-						ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			})
-			.setParallelism(1);
-
-			Properties props = new Properties();
-			props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
-			props.putAll(secureProps);
-			
-			// sink partitions into 
-			kafkaServer.produceIntoKafka(stream, topic,
-					new KeyedSerializationSchemaWrapper<>(serSchema),
-					props,
-					new CustomPartitioner(parallelism)).setParallelism(parallelism);
-
-			// ------ consuming topology ---------
-
-			Properties consumerProps = new Properties();
-			consumerProps.putAll(standardProps);
-			consumerProps.putAll(secureProps);
-			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
-			
-			env.addSource(source).setParallelism(parallelism)
-
-					// mapper that validates partitioning and maps to partition
-					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-						
-						private int ourPartition = -1;
-						@Override
-						public Integer map(Tuple2<Long, String> value) {
-							int partition = value.f0.intValue() % parallelism;
-							if (ourPartition != -1) {
-								assertEquals("inconsistent partitioning", ourPartition, partition);
-							} else {
-								ourPartition = partition;
-							}
-							return partition;
-						}
-					}).setParallelism(parallelism)
-					
-					.addSink(new SinkFunction<Integer>() {
-						
-						private int[] valuesPerPartition = new int[parallelism];
-						
-						@Override
-						public void invoke(Integer value) throws Exception {
-							valuesPerPartition[value]++;
-							
-							boolean missing = false;
-							for (int i : valuesPerPartition) {
-								if (i < 100) {
-									missing = true;
-									break;
-								}
-							}
-							if (!missing) {
-								throw new SuccessException();
-							}
-						}
-					}).setParallelism(1);
-			
-			tryExecute(env, "custom partitioning test");
-
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
-
-		private final int expectedPartitions;
-
-		public CustomPartitioner(int expectedPartitions) {
-			this.expectedPartitions = expectedPartitions;
-		}
-
-
-		@Override
-		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-			assertEquals(expectedPartitions, numPartitions);
-
-			return (int) (next.f0 % numPartitions);
-		}
-	}
-}