You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 11:45:46 UTC

[2/2] flink git commit: [FLINK-2372] Add new FlinkKafkaProducer bases on the new producer API

[FLINK-2372] Add new FlinkKafkaProducer bases on the new producer API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/563ee9ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/563ee9ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/563ee9ad

Branch: refs/heads/release-0.10.0-milestone-1
Commit: 563ee9ad8dcf03142721b212acde91a68dc4a4d5
Parents: 41a1fdd
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 28 14:33:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 7 15:44:18 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer.java    |  225 ++++
 .../streaming/connectors/kafka/KafkaSink.java   |  187 ---
 .../kafka/SerializableKafkaPartitioner.java     |   25 -
 .../connectors/kafka/api/KafkaSink.java         |    3 +-
 .../kafka/partitioner/FixedPartitioner.java     |   80 ++
 .../kafka/partitioner/KafkaPartitioner.java     |   42 +
 .../KafkaConsumerPartitionAssignmentTest.java   |    2 -
 .../connectors/kafka/KafkaConsumerTest.java     |    3 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 1096 ++++++++----------
 .../streaming/connectors/kafka/KafkaITCase.java |   23 +-
 .../connectors/kafka/KafkaProducerITCase.java   |    8 +-
 .../connectors/kafka/KafkaTestBase.java         |   32 +-
 .../connectors/kafka/TestFixedPartitioner.java  |  104 ++
 .../internals/ZookeeperOffsetHandlerTest.java   |    1 -
 .../kafka/testutils/DataGenerators.java         |   19 +-
 .../kafka/testutils/Tuple2Partitioner.java      |    6 +-
 .../util/serialization/SerializationSchema.java |    2 +-
 17 files changed, 1023 insertions(+), 835 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
new file mode 100644
index 0000000..3d666ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.List;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Array with the partition ids of the given topicId
+	 * The size of this array is the number of partitions
+	 */
+	private final int[] partitions;
+
+	/**
+	 * User defined properties for the Producer
+	 */
+	private final Properties producerConfig;
+
+	/**
+	 * The name of the topic this producer is writing data to
+	 */
+	private String topicId;
+
+	/**
+	 * (Serializable) SerializationSchema for turning objects used with Flink into
+	 * byte[] for Kafka.
+	 */
+	private SerializationSchema<IN, byte[]> schema;
+
+	/**
+	 * User-provided partitioner for assigning an object to a Kafka partition.
+	 */
+	private KafkaPartitioner partitioner;
+
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/**
+	 * KafkaProducer instance.
+	 */
+	private transient KafkaProducer<byte[], byte[]> producer;
+
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema.
+	 */
+	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema.
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, null);
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+		Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
+		Preconditions.checkNotNull(producerConfig, "producerConfig not set");
+		ClosureCleaner.ensureSerializable(customPartitioner);
+		ClosureCleaner.ensureSerializable(serializationSchema);
+
+		this.topicId = topicId;
+		this.schema = serializationSchema;
+		this.producerConfig = producerConfig;
+
+		// set the producer configuration properties.
+
+		if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+		}
+
+		if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+		}
+
+
+		// create a local KafkaProducer to get the list of partitions.
+		// this will also ensure locally that all required ProducerConfig values are set.
+		{
+			KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig);
+			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
+
+			this.partitions = new int[partitionsList.size()];
+			for (int i = 0; i < partitions.length; i++) {
+				partitions[i] = partitionsList.get(i).partition();
+			}
+			getPartitionsProd.close();
+		}
+
+		if(customPartitioner == null) {
+			this.partitioner = new FixedPartitioner();
+		} else {
+			this.partitioner = customPartitioner;
+		}
+	}
+
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+
+		partitioner.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), partitions);
+
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), topicId);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 *
+	 * @param next
+	 * 		The incoming data
+	 */
+	@Override
+	public void invoke(IN next) {
+		byte[] serialized = schema.serialize(next);
+
+		producer.send(new ProducerRecord<byte[], byte[]>(topicId,
+			partitioner.partition(next, partitions.length),
+			null,
+			serialized),
+			new ErrorLoggingCallback(topicId, null, serialized, false));
+	}
+
+
+	@Override
+	public void close() {
+		if (producer != null) {
+			producer.close();
+		}
+	}
+
+
+	// ----------------------------------- Utilities --------------------------
+
+	public static Properties getPropertiesFromBrokerList(String brokerList) {
+		String[] elements = brokerList.split(",");
+		for(String broker: elements) {
+			NetUtils.getCorrectHostnamePort(broker);
+		}
+		Properties props = new Properties();
+		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+		return props;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
deleted file mode 100644
index d115913..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.google.common.base.Preconditions;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.PartitionerWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * @param <IN>
- * 		Type of the sink input
- */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
-	private Producer<IN, byte[]> producer;
-	private Properties userDefinedProperties;
-	private String topicId;
-	private String brokerList;
-	private SerializationSchema<IN, byte[]> schema;
-	private SerializableKafkaPartitioner partitioner;
-	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
-		this(brokerList, topicId, new Properties(), serializationSchema);
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic with custom Producer configuration.
-	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
-	 * configuration.
-	 *
-	 * @param brokerList
-	 * 		Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param producerConfig
-	 * 		Configurations of the Kafka producer
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema) {
-		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
-			NetUtils.getCorrectHostnamePort(broker);
-		}
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-
-		this.brokerList = brokerList;
-		this.topicId = topicId;
-		this.schema = serializationSchema;
-		this.partitionerClass = null;
-		this.userDefinedProperties = producerConfig;
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 * @param partitioner
-	 * 		User defined partitioner.
-	 */
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		ClosureCleaner.ensureSerializable(partitioner);
-		this.partitioner = partitioner;
-	}
-
-	public KafkaSink(String brokerList,
-					String topicId,
-					SerializationSchema<IN, byte[]> serializationSchema,
-					Class<? extends SerializableKafkaPartitioner> partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		this.partitionerClass = partitioner;
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-
-		Properties properties = new Properties();
-
-		properties.put("metadata.broker.list", brokerList);
-		properties.put("request.required.acks", "-1");
-		properties.put("message.send.max.retries", "10");
-
-		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		// this will not be used as the key will not be serialized
-		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
-			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
-		}
-
-		if (partitioner != null) {
-			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
-			// java serialization will do the rest.
-			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
-		}
-		if (partitionerClass != null) {
-			properties.put("partitioner.class", partitionerClass);
-		}
-
-		ProducerConfig config = new ProducerConfig(properties);
-
-		try {
-			producer = new Producer<IN, byte[]>(config);
-		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) {
-		byte[] serialized = schema.serialize(next);
-
-		// Sending message without serializable key.
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
-	}
-
-	@Override
-	public void close() {
-		if (producer != null) {
-			producer.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
deleted file mode 100644
index 7b9f991..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index c8400a5..f856926 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -17,6 +17,7 @@
 package org.apache.flink.streaming.connectors.kafka.api;
 
 
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 /**
@@ -26,7 +27,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
  * This class will be removed in future releases of Flink.
  */
 @Deprecated
-public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.kafka.KafkaSink<IN> {
+public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
 	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
 		super(brokerList, topicId, serializationSchema);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..346a7d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * 	# More Flink partitions than kafka partitions
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	---------------->	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * </pre>
+ * 	--> Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	---------------->	1
+ * 			2	---------------->	2
+ * 									3
+ * 									4
+ * 									5
+ * </pre>
+ *
+ *  --> Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner extends KafkaPartitioner implements Serializable {
+	private static final long serialVersionUID = 1627268846962918126L;
+
+	int targetPartition = -1;
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		int p = 0;
+		for(int i = 0; i < parallelInstances; i++) {
+			if(i == parallelInstanceId) {
+				targetPartition = partitions[p];
+				return;
+			}
+			if(++p == partitions.length) {
+				p = 0;
+			}
+		}
+	}
+
+	@Override
+	public int partition(Object element, int numPartitions) {
+		if(targetPartition == -1) {
+			throw new RuntimeException("The partitioner has not been initialized properly");
+		}
+		return targetPartition;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..55519f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+import kafka.producer.Partitioner;
+
+import java.io.Serializable;
+
+/**
+ * Extended Kafka Partitioner.
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners have to be serializable!
+ */
+public abstract class KafkaPartitioner implements Partitioner, Serializable {
+
+	private static final long serialVersionUID = -1974260817778593473L;
+
+	/**
+	 * Initializer for the Partitioner.
+	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * @param parallelInstances the total number of parallel instances
+	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
+	 */
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		// overwrite this method if needed.
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 3d392aa..9281e04 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
index e35fcfb..cf745b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
 
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -124,7 +124,6 @@ public class KafkaConsumerTest {
 	}
 	
 	@Test
-	@Ignore("Kafka consumer internally makes an infinite loop")
 	public void testCreateSourceWithoutCluster() {
 		try {
 			Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 939f564..8676d5d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.consumer.Consumer;
@@ -67,6 +66,7 @@ import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.util.Collector;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Assert;
 
 import scala.collection.Seq;
@@ -113,67 +113,61 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	/**
 	 * Test that validates that checkpointing and checkpoint notification works properly
 	 */
-	public void runCheckpointingTest() {
-		try {
-			createTestTopic("testCheckpointing", 1, 1);
+	public void runCheckpointingTest() throws Exception {
+		createTestTopic("testCheckpointing", 1, 1);
 
-			FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
-			Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			pendingCheckpointsField.setAccessible(true);
-			LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
+		FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
+		Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+		pendingCheckpointsField.setAccessible(true);
+		LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
 
-			Assert.assertEquals(0, pendingCheckpoints.size());
-			source.setRuntimeContext(new MockRuntimeContext(1, 0));
+		Assert.assertEquals(0, pendingCheckpoints.size());
+		source.setRuntimeContext(new MockRuntimeContext(1, 0));
 
-			final long[] initialOffsets = new long[] { 1337 };
+		final long[] initialOffsets = new long[] { 1337 };
 
-			// first restore
-			source.restoreState(initialOffsets);
+		// first restore
+		source.restoreState(initialOffsets);
 
-			// then open
-			source.open(new Configuration());
-			long[] state1 = source.snapshotState(1, 15);
+		// then open
+		source.open(new Configuration());
+		long[] state1 = source.snapshotState(1, 15);
 
-			assertArrayEquals(initialOffsets, state1);
+		assertArrayEquals(initialOffsets, state1);
 
-			long[] state2 = source.snapshotState(2, 30);
-			Assert.assertArrayEquals(initialOffsets, state2);
-			Assert.assertEquals(2, pendingCheckpoints.size());
+		long[] state2 = source.snapshotState(2, 30);
+		Assert.assertArrayEquals(initialOffsets, state2);
+		Assert.assertEquals(2, pendingCheckpoints.size());
 
-			source.notifyCheckpointComplete(1);
-			Assert.assertEquals(1, pendingCheckpoints.size());
+		source.notifyCheckpointComplete(1);
+		Assert.assertEquals(1, pendingCheckpoints.size());
 
-			source.notifyCheckpointComplete(2);
-			Assert.assertEquals(0, pendingCheckpoints.size());
+		source.notifyCheckpointComplete(2);
+		Assert.assertEquals(0, pendingCheckpoints.size());
 
-			source.notifyCheckpointComplete(666); // invalid checkpoint
-			Assert.assertEquals(0, pendingCheckpoints.size());
+		source.notifyCheckpointComplete(666); // invalid checkpoint
+		Assert.assertEquals(0, pendingCheckpoints.size());
 
-			// create 500 snapshots
-			for (int i = 100; i < 600; i++) {
-				source.snapshotState(i, 15 * i);
-			}
-			Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+		// create 500 snapshots
+		for (int i = 100; i < 600; i++) {
+			source.snapshotState(i, 15 * i);
+		}
+		Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
 
-			// commit only the second last
-			source.notifyCheckpointComplete(598);
-			Assert.assertEquals(1, pendingCheckpoints.size());
+		// commit only the second last
+		source.notifyCheckpointComplete(598);
+		Assert.assertEquals(1, pendingCheckpoints.size());
 
-			// access invalid checkpoint
-			source.notifyCheckpointComplete(590);
+		// access invalid checkpoint
+		source.notifyCheckpointComplete(590);
 
-			// and the last
-			source.notifyCheckpointComplete(599);
-			Assert.assertEquals(0, pendingCheckpoints.size());
+		// and the last
+		source.notifyCheckpointComplete(599);
+		Assert.assertEquals(0, pendingCheckpoints.size());
 
-			source.close();
+		source.close();
 
-			deleteTestTopic("testCheckpointing");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		deleteTestTopic("testCheckpointing");
 	}
 
 	/**
@@ -181,70 +175,64 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 *
 	 * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
 	 */
-	public void runOffsetInZookeeperValidationTest() {
-		try {
-			LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
+	public void runOffsetInZookeeperValidationTest() throws Exception {
+		LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
 
-			final String topicName = "testOffsetHacking";
-			final int parallelism = 3;
-			
-			createTestTopic(topicName, parallelism, 1);
+		final String topicName = "testOffsetHacking";
+		final int parallelism = 3;
 
-			StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env1.getConfig().disableSysoutLogging();
-			env1.enableCheckpointing(50);
-			env1.setNumberOfExecutionRetries(0);
-			env1.setParallelism(parallelism);
+		createTestTopic(topicName, parallelism, 1);
 
-			StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env2.getConfig().disableSysoutLogging();
-			env2.enableCheckpointing(50);
-			env2.setNumberOfExecutionRetries(0);
-			env2.setParallelism(parallelism);
+		StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env1.getConfig().disableSysoutLogging();
+		env1.enableCheckpointing(50);
+		env1.setNumberOfExecutionRetries(0);
+		env1.setParallelism(parallelism);
 
-			StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env3.getConfig().disableSysoutLogging();
-			env3.enableCheckpointing(50);
-			env3.setNumberOfExecutionRetries(0);
-			env3.setParallelism(parallelism);
+		StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env2.getConfig().disableSysoutLogging();
+		env2.enableCheckpointing(50);
+		env2.setNumberOfExecutionRetries(0);
+		env2.setParallelism(parallelism);
 
-			// write a sequence from 0 to 99 to each of the 3 partitions.
-			writeSequence(env1, topicName, 100, parallelism);
+		StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env3.getConfig().disableSysoutLogging();
+		env3.enableCheckpointing(50);
+		env3.setNumberOfExecutionRetries(0);
+		env3.setParallelism(parallelism);
 
-			readSequence(env2, standardProps, parallelism, topicName, 100, 0);
+		// write a sequence from 0 to 99 to each of the 3 partitions.
+		writeSequence(env1, topicName, 100, parallelism);
 
-			ZkClient zkClient = createZookeeperClient();
-			
-			long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
-			long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
-			long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
+		readSequence(env2, standardProps, parallelism, topicName, 100, 0);
 
-			LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+		ZkClient zkClient = createZookeeperClient();
 
-			assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-			assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
-			assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
+		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
+		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
 
-			LOG.info("Manipulating offsets");
+		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
-			// set the offset to 50 for the three partitions
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
+		assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+		assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+		assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
 
-			zkClient.close();
-			
-			// create new env
-			readSequence(env3, standardProps, parallelism, topicName, 50, 50);
+		LOG.info("Manipulating offsets");
 
-			deleteTestTopic(topicName);
-			
-			LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		// set the offset to 50 for the three partitions
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
+		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
+
+		zkClient.close();
+
+		// create new env
+		readSequence(env3, standardProps, parallelism, topicName, 50, 50);
+
+		deleteTestTopic(topicName);
+
+		LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
 	}
 
 	/**
@@ -255,655 +243,600 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
 	 * </pre>
 	 */
-	public void runSimpleConcurrentProducerConsumerTopology() {
-		try {
-			LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
+	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
+		LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
 
-			final String topic = "concurrentProducerConsumerTopic";
-			final int parallelism = 3;
-			final int elementsPerPartition = 100;
-			final int totalElements = parallelism * elementsPerPartition;
+		final String topic = "concurrentProducerConsumerTopic";
+		final int parallelism = 3;
+		final int elementsPerPartition = 100;
+		final int totalElements = parallelism * elementsPerPartition;
 
-			createTestTopic(topic, parallelism, 2);
+		createTestTopic(topic, parallelism, 2);
 
-			final StreamExecutionEnvironment env =
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
+		final StreamExecutionEnvironment env =
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
 
-			TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+		TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
 
-			TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
-					new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+		TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
+				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
 
-			TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
-					new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+		TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
+				new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
 
-			// ----------- add producer dataflow ----------
+		// ----------- add producer dataflow ----------
 
-			DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
 
-				private boolean running = true;
+			private boolean running = true;
 
-				@Override
-				public void run(SourceContext<Tuple2<Long, String>> ctx) {
-					int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
-					int limit = cnt + elementsPerPartition;
+			@Override
+			public void run(SourceContext<Tuple2<Long, String>> ctx) {
+				int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+				int limit = cnt + elementsPerPartition;
 
 
-					while (running && cnt < limit) {
-						ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
-						cnt++;
-					}
-				}
-
-				@Override
-				public void cancel() {
-					running = false;
+				while (running && cnt < limit) {
+					ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
+					cnt++;
 				}
-			});
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
-
-			// ----------- add consumer dataflow ----------
-
-			FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
+			}
 
-			DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+		stream.addSink(new FlinkKafkaProducer<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
 
-			consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+		// ----------- add consumer dataflow ----------
 
-				private int elCnt = 0;
-				private BitSet validator = new BitSet(totalElements);
+		FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
 
-				@Override
-				public void invoke(Tuple2<Long, String> value) throws Exception {
-					String[] sp = value.f1.split("-");
-					int v = Integer.parseInt(sp[1]);
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
 
-					assertEquals(value.f0 - 1000, (long) v);
+		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
 
-					assertFalse("Received tuple twice", validator.get(v));
-					validator.set(v);
-					elCnt++;
+			private int elCnt = 0;
+			private BitSet validator = new BitSet(totalElements);
 
-					if (elCnt == totalElements) {
-						// check if everything in the bitset is set to true
-						int nc;
-						if ((nc = validator.nextClearBit(0)) != totalElements) {
-							fail("The bitset was not set to 1 on all elements. Next clear:"
-									+ nc + " Set: " + validator);
-						}
-						throw new SuccessException();
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				assertFalse("Received tuple twice", validator.get(v));
+				validator.set(v);
+				elCnt++;
+
+				if (elCnt == totalElements) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != totalElements) {
+						fail("The bitset was not set to 1 on all elements. Next clear:"
+								+ nc + " Set: " + validator);
 					}
+					throw new SuccessException();
 				}
+			}
 
-				@Override
-				public void close() throws Exception {
-					super.close();
-				}
-			}).setParallelism(1);
+			@Override
+			public void close() throws Exception {
+				super.close();
+			}
+		}).setParallelism(1);
 
-			tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
+		tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
 
-			LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
+		LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
 
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		deleteTestTopic(topic);
 	}
 
 	/**
 	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
 	 * Flink sources.
 	 */
-	public void runOneToOneExactlyOnceTest() {
-		try {
-			LOG.info("Starting runOneToOneExactlyOnceTest()");
+	public void runOneToOneExactlyOnceTest() throws Exception {
+		LOG.info("Starting runOneToOneExactlyOnceTest()");
 
-			final String topic = "oneToOneTopic";
-			final int parallelism = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = parallelism * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-			createTestTopic(topic, parallelism, 1);
-			
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings, 
-					topic, parallelism, numElementsPerPartition, true);
-			
-			// run the topology that fails and recovers
+		final String topic = "oneToOneTopic";
+		final int parallelism = 5;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = parallelism * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
 
-			DeserializationSchema<Integer> schema = 
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-			
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(parallelism, 1))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+		createTestTopic(topic, parallelism, 1);
+
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				brokerConnectionStrings,
+				topic, parallelism, numElementsPerPartition, true);
+
+		// run the topology that fails and recovers
 
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "One-to-one exactly once test");
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.map(new PartitionValidatingMapper(parallelism, 1))
+				.map(new FailingIdentityMapper<Integer>(failAfterElements))
+				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+		tryExecute(env, "One-to-one exactly once test");
+
+		// this cannot be reliably checked, as checkpoints come in time intervals, and
+		// failures after a number of elements
 //			assertTrue("Job did not do a checkpoint before the failure",
 //					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+
+		deleteTestTopic(topic);
 	}
 
 	/**
 	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
 	 * one Flink source will read multiple Kafka partitions.
 	 */
-	public void runOneSourceMultiplePartitionsExactlyOnceTest() {
-		try {
-			LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
+	public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
+		LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
 
-			final String topic = "oneToManyTopic";
-			final int numPartitions = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = numPartitions * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
-			final int parallelism = 2;
+		final String topic = "oneToManyTopic";
+		final int numPartitions = 5;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = numPartitions * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
 
-			createTestTopic(topic, numPartitions, 1);
+		final int parallelism = 2;
 
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, numPartitions, numElementsPerPartition, true);
+		createTestTopic(topic, numPartitions, 1);
 
-			// run the topology that fails and recovers
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				brokerConnectionStrings,
+				topic, numPartitions, numElementsPerPartition, true);
 
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		// run the topology that fails and recovers
 
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
 
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(numPartitions, 3))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
 
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "One-source-multi-partitions exactly once test");
+		env
+				.addSource(kafkaSource)
+				.map(new PartitionValidatingMapper(numPartitions, 3))
+				.map(new FailingIdentityMapper<Integer>(failAfterElements))
+				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
 
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
+		FailingIdentityMapper.failedBefore = false;
+		tryExecute(env, "One-source-multi-partitions exactly once test");
+
+		// this cannot be reliably checked, as checkpoints come in time intervals, and
+		// failures after a number of elements
 //			assertTrue("Job did not do a checkpoint before the failure",
 //					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+
+		deleteTestTopic(topic);
 	}
 
 	/**
 	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
 	 * that some Flink sources will read no partitions.
 	 */
-	public void runMultipleSourcesOnePartitionExactlyOnceTest() {
-		try {
-			LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
+	public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
+		LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
 
-			final String topic = "manyToOneTopic";
-			final int numPartitions = 5;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = numPartitions * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
+		final String topic = "manyToOneTopic";
+		final int numPartitions = 5;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = numPartitions * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
 
-			final int parallelism = 8;
+		final int parallelism = 8;
 
-			createTestTopic(topic, numPartitions, 1);
+		createTestTopic(topic, numPartitions, 1);
 
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, numPartitions, numElementsPerPartition, true);
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				brokerConnectionStrings,
+				topic, numPartitions, numElementsPerPartition, true);
 
-			// run the topology that fails and recovers
-			
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		// run the topology that fails and recovers
 
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.enableCheckpointing(500);
-			env.setParallelism(parallelism);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			env.setBufferTimeout(0);
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(numPartitions, 1))
-					.map(new FailingIdentityMapper<Integer>(failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-			
-			FailingIdentityMapper.failedBefore = false;
-			tryExecute(env, "multi-source-one-partitions exactly once test");
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.enableCheckpointing(500);
+		env.setParallelism(parallelism);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+		env.setBufferTimeout(0);
+
+		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
 
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
+		env
+			.addSource(kafkaSource)
+			.map(new PartitionValidatingMapper(numPartitions, 1))
+			.map(new FailingIdentityMapper<Integer>(failAfterElements))
+			.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		FailingIdentityMapper.failedBefore = false;
+		tryExecute(env, "multi-source-one-partitions exactly once test");
+
+		// this cannot be reliably checked, as checkpoints come in time intervals, and
+		// failures after a number of elements
 //			assertTrue("Job did not do a checkpoint before the failure",
 //					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-			
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+
+		deleteTestTopic(topic);
 	}
 	
 	
 	/**
 	 * Tests that the source can be properly canceled when reading full partitions. 
 	 */
-	public void runCancelingOnFullInputTest() {
-		try {
-			final String topic = "cancelingOnFullTopic";
+	public void runCancelingOnFullInputTest() throws Exception {
+		final String topic = "cancelingOnFullTopic";
 
-			final int parallelism = 3;
-			createTestTopic(topic, parallelism, 1);
+		final int parallelism = 3;
+		createTestTopic(topic, parallelism, 1);
 
-			// launch a producer thread
-			DataGenerators.InfiniteStringsGenerator generator =
-					new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
-			generator.start();
+		// launch a producer thread
+		DataGenerators.InfiniteStringsGenerator generator =
+				new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
+		generator.start();
 
-			// launch a consumer asynchronously
+		// launch a consumer asynchronously
 
-			final AtomicReference<Throwable> jobError = new AtomicReference<>();
+		final AtomicReference<Throwable> jobError = new AtomicReference<>();
 
-			final Runnable jobRunner = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-						env.setParallelism(parallelism);
-						env.enableCheckpointing(100);
-						env.getConfig().disableSysoutLogging();
+		final Runnable jobRunner = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					env.setParallelism(parallelism);
+					env.enableCheckpointing(100);
+					env.getConfig().disableSysoutLogging();
 
-						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+					FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
 
-						env.addSource(source).addSink(new DiscardingSink<String>());
+					env.addSource(source).addSink(new DiscardingSink<String>());
 
-						env.execute();
-					}
-					catch (Throwable t) {
-						jobError.set(t);
-					}
+					env.execute();
 				}
-			};
-
-			Thread runnerThread = new Thread(jobRunner, "program runner thread");
-			runnerThread.start();
+				catch (Throwable t) {
+					jobError.set(t);
+				}
+			}
+		};
 
-			// wait a bit before canceling
-			Thread.sleep(2000);
+		Thread runnerThread = new Thread(jobRunner, "program runner thread");
+		runnerThread.start();
 
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
+		// wait a bit before canceling
+		Thread.sleep(2000);
 
-			// wait for the program to be done and validate that we failed with the right exception
-			runnerThread.join();
+		// cancel
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
 
-			Throwable failueCause = jobError.get();
-			assertNotNull("program did not fail properly due to canceling", failueCause);
-			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		// wait for the program to be done and validate that we failed with the right exception
+		runnerThread.join();
 
-			if (generator.isAlive()) {
-				generator.shutdown();
-				generator.join();
-			}
-			else {
-				Throwable t = generator.getError();
-				if (t != null) {
-					t.printStackTrace();
-					fail("Generator failed: " + t.getMessage());
-				} else {
-					fail("Generator failed with no exception");
-				}
-			}
+		Throwable failueCause = jobError.get();
+		assertNotNull("program did not fail properly due to canceling", failueCause);
+		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
 
-			deleteTestTopic(topic);
+		if (generator.isAlive()) {
+			generator.shutdown();
+			generator.join();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		else {
+			Throwable t = generator.getError();
+			if (t != null) {
+				t.printStackTrace();
+				fail("Generator failed: " + t.getMessage());
+			} else {
+				fail("Generator failed with no exception");
+			}
 		}
+
+		deleteTestTopic(topic);
 	}
 
 	/**
 	 * Tests that the source can be properly canceled when reading empty partitions. 
 	 */
-	public void runCancelingOnEmptyInputTest() {
-		try {
-			final String topic = "cancelingOnEmptyInputTopic";
+	public void runCancelingOnEmptyInputTest() throws Exception {
+		final String topic = "cancelingOnEmptyInputTopic";
 
-			final int parallelism = 3;
-			createTestTopic(topic, parallelism, 1);
+		final int parallelism = 3;
+		createTestTopic(topic, parallelism, 1);
 
-			final AtomicReference<Throwable> error = new AtomicReference<>();
+		final AtomicReference<Throwable> error = new AtomicReference<>();
 
-			final Runnable jobRunner = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-						env.setParallelism(parallelism);
-						env.enableCheckpointing(100);
-						env.getConfig().disableSysoutLogging();
+		final Runnable jobRunner = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+					env.setParallelism(parallelism);
+					env.enableCheckpointing(100);
+					env.getConfig().disableSysoutLogging();
 
-						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+					FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
 
-						env.addSource(source).addSink(new DiscardingSink<String>());
+					env.addSource(source).addSink(new DiscardingSink<String>());
 
-						env.execute();
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
+					env.execute();
+				}
+				catch (Throwable t) {
+					error.set(t);
 				}
-			};
+			}
+		};
 
-			Thread runnerThread = new Thread(jobRunner, "program runner thread");
-			runnerThread.start();
+		Thread runnerThread = new Thread(jobRunner, "program runner thread");
+		runnerThread.start();
 
-			// wait a bit before canceling
-			Thread.sleep(2000);
+		// wait a bit before canceling
+		Thread.sleep(2000);
 
-			// cancel
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
+		// cancel
+		JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
 
-			// wait for the program to be done and validate that we failed with the right exception
-			runnerThread.join();
+		// wait for the program to be done and validate that we failed with the right exception
+		runnerThread.join();
 
-			Throwable failueCause = error.get();
-			assertNotNull("program did not fail properly due to canceling", failueCause);
-			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+		Throwable failueCause = error.get();
+		assertNotNull("program did not fail properly due to canceling", failueCause);
+		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
 
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		deleteTestTopic(topic);
 	}
 
 	/**
 	 * Tests that the source can be properly canceled when reading full partitions. 
 	 */
-	public void runFailOnDeployTest() {
-		try {
-			final String topic = "failOnDeployTopic";
-			
-			createTestTopic(topic, 2, 1);
+	public void runFailOnDeployTest() throws Exception {
+		final String topic = "failOnDeployTopic";
 
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		createTestTopic(topic, 2, 1);
 
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(12); // needs to be more that the mini cluster has slots
-			env.getConfig().disableSysoutLogging();
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-			
-			env
-					.addSource(kafkaSource)
-					.addSink(new DiscardingSink<Integer>());
-			
-			try {
-				env.execute();
-				fail("this test should fail with an exception");
-			}
-			catch (ProgramInvocationException e) {
-				
-				// validate that we failed due to a NoResourceAvailableException
-				Throwable cause = e.getCause();
-				int depth = 0;
-				boolean foundResourceException = false;
-				
-				while (cause != null && depth++ < 20) {
-					if (cause instanceof NoResourceAvailableException) {
-						foundResourceException = true;
-						break;
-					}
-					cause = cause.getCause();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(12); // needs to be more that the mini cluster has slots
+		env.getConfig().disableSysoutLogging();
+
+		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.addSink(new DiscardingSink<Integer>());
+
+		try {
+			env.execute();
+			fail("this test should fail with an exception");
+		}
+		catch (ProgramInvocationException e) {
+
+			// validate that we failed due to a NoResourceAvailableException
+			Throwable cause = e.getCause();
+			int depth = 0;
+			boolean foundResourceException = false;
+
+			while (cause != null && depth++ < 20) {
+				if (cause instanceof NoResourceAvailableException) {
+					foundResourceException = true;
+					break;
 				}
-				
-				assertTrue("Wrong exception", foundResourceException);
+				cause = cause.getCause();
 			}
 
-			deleteTestTopic(topic);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+			assertTrue("Wrong exception", foundResourceException);
 		}
+
+		deleteTestTopic(topic);
 	}
 
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
 	 */
-	public void runBigRecordTestTopology() {
-		try {
-			LOG.info("Starting runBigRecordTestTopology()");
+	public void runBigRecordTestTopology() throws Exception {
+		LOG.info("Starting runBigRecordTestTopology()");
 
-			final String topic = "bigRecordTestTopic";
-			final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-			
-			createTestTopic(topic, parallelism, 1);
+		final String topic = "bigRecordTestTopic";
+		final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
 
-			final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+		createTestTopic(topic, parallelism, 1);
 
-			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
-					new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
 
-			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
-					new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
+				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
 
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setNumberOfExecutionRetries(0);
-			env.getConfig().disableSysoutLogging();
-			env.enableCheckpointing(100);
-			env.setParallelism(parallelism);
+		final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
+				new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
 
-			// add consuming topology:
-			Properties consumerProps = new Properties();
-			consumerProps.putAll(standardProps);
-			consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
-			consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
-			consumerProps.setProperty("queued.max.message.chunks", "1");
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setNumberOfExecutionRetries(0);
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(100);
+		env.setParallelism(parallelism);
 
-			FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
-			DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
+		// add consuming topology:
+		Properties consumerProps = new Properties();
+		consumerProps.putAll(standardProps);
+		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
+		consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
+		consumerProps.setProperty("queued.max.message.chunks", "1");
 
-			consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+		FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
+		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
 
-				private int elCnt = 0;
+		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
 
-				@Override
-				public void invoke(Tuple2<Long, byte[]> value) throws Exception {
-					elCnt++;
-					if (value.f0 == -1) {
-						// we should have seen 11 elements now.
-						if(elCnt == 11) {
-							throw new SuccessException();
-						} else {
-							throw new RuntimeException("There have been "+elCnt+" elements");
-						}
-					}
-					if(elCnt > 10) {
-						throw new RuntimeException("More than 10 elements seen: "+elCnt);
+			private int elCnt = 0;
+
+			@Override
+			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+				elCnt++;
+				if (value.f0 == -1) {
+					// we should have seen 11 elements now.
+					if(elCnt == 11) {
+						throw new SuccessException();
+					} else {
+						throw new RuntimeException("There have been "+elCnt+" elements");
 					}
 				}
-			});
+				if(elCnt > 10) {
+					throw new RuntimeException("More than 10 elements seen: "+elCnt);
+				}
+			}
+		});
 
-			// add producing topology
-			Properties producerProps = new Properties();
-			producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
-			
-			DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+		// add producing topology
+		Properties producerProps = new Properties();
+		producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
+		producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
 
-				private boolean running;
+		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
 
-				@Override
-				public void open(Configuration parameters) throws Exception {
-					super.open(parameters);
-					running = true;
-				}
+			private boolean running;
 
-				@Override
-				public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
-					Random rnd = new Random();
-					long cnt = 0;
-					int fifteenMb = 1024 * 1024 * 15;
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				running = true;
+			}
 
-					while (running) {
-						byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
-						ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+			@Override
+			public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
+				Random rnd = new Random();
+				long cnt = 0;
+				int fifteenMb = 1024 * 1024 * 15;
 
-						Thread.sleep(100);
+				while (running) {
+					byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
+					ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
 
-						if (cnt == 10) {
-							// signal end
-							ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
-							break;
-						}
+					Thread.sleep(100);
+
+					if (cnt == 10) {
+						// signal end
+						ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+						break;
 					}
 				}
+			}
 
-				@Override
-				public void cancel() {
-					running = false;
-				}
-			});
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
 
-			stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
-					producerProps, deserSchema));
+		stream.addSink(new FlinkKafkaProducer<Tuple2<Long, byte[]>>(topic, deserSchema, producerProps));
 
-			tryExecute(env, "big topology test");
+		tryExecute(env, "big topology test");
 
-			deleteTestTopic(topic);
-			
-			LOG.info("Finished runBigRecordTestTopology()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		deleteTestTopic(topic);
+
+		LOG.info("Finished runBigRecordTestTopology()");
 	}
 
 	
-	public void runBrokerFailureTest() {
-		try {
-			LOG.info("starting runBrokerFailureTest()");
-			
-			final String topic = "brokerFailureTestTopic";
+	public void runBrokerFailureTest() throws Exception {
+		LOG.info("starting runBrokerFailureTest()");
 
-			final int parallelism = 2;
-			final int numElementsPerPartition = 1000;
-			final int totalElements = parallelism * numElementsPerPartition;
-			final int failAfterElements = numElementsPerPartition / 3;
-			
+		final String topic = "brokerFailureTestTopic";
 
-			createTestTopic(topic, parallelism, 2);
-
-			DataGenerators.generateRandomizedIntegerSequence(
-					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
-					brokerConnectionStrings,
-					topic, parallelism, numElementsPerPartition, true);
-
-			// find leader to shut down
-			ZkClient zkClient = createZookeeperClient();
-			PartitionMetadata firstPart = null;
-			do {
-				if (firstPart != null) {
-					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-					// not the first try. Sleep a bit
-					Thread.sleep(150);
-				}
+		final int parallelism = 2;
+		final int numElementsPerPartition = 1000;
+		final int totalElements = parallelism * numElementsPerPartition;
+		final int failAfterElements = numElementsPerPartition / 3;
+
+
+		createTestTopic(topic, parallelism, 2);
 
-				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-				firstPart = partitionMetadata.head();
+		DataGenerators.generateRandomizedIntegerSequence(
+				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+				brokerConnectionStrings,
+				topic, parallelism, numElementsPerPartition, true);
+
+		// find leader to shut down
+		ZkClient zkClient = createZookeeperClient();
+		PartitionMetadata firstPart = null;
+		do {
+			if (firstPart != null) {
+				LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+				// not the first try. Sleep a bit
+				Thread.sleep(150);
 			}
-			while (firstPart.errorCode() != 0);
-			zkClient.close();
 
-			final String leaderToShutDown = firstPart.leader().get().connectionString();
-			LOG.info("Leader to shutdown {}", leaderToShutDown);
-			
-			
-			// run the topology that fails and recovers
+			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+			firstPart = partitionMetadata.head();
+		}
+		while (firstPart.errorCode() != 0);
+		zkClient.close();
 
-			DeserializationSchema<Integer> schema =
-					new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		final String leaderToShutDown = firstPart.leader().get().connectionString();
+		final int leaderIdToShutDown = firstPart.leader().get().id();
+		LOG.info("Leader to shutdown {}", leaderToShutDown);
 
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-			env.setParallelism(parallelism);
-			env.enableCheckpointing(500);
-			env.setNumberOfExecutionRetries(3);
-			env.getConfig().disableSysoutLogging();
-			
 
-			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+		// run the topology that fails and recovers
 
-			env
-					.addSource(kafkaSource)
-					.map(new PartitionValidatingMapper(parallelism, 1))
-					.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
-					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+		DeserializationSchema<Integer> schema =
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 
-			BrokerKillingMapper.killedLeaderBefore = false;
-			tryExecute(env, "One-to-one exactly once test");
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(parallelism);
+		env.enableCheckpointing(500);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
 
-			// this cannot be reliably checked, as checkpoints come in time intervals, and
-			// failures after a number of elements
-//			assertTrue("Job did not do a checkpoint before the failure",
-//					BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
 
-			LOG.info("finished runBrokerFailureTest()");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+		env
+				.addSource(kafkaSource)
+				.map(new PartitionValidatingMapper(parallelism, 1))
+				.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+		BrokerKillingMapper.killedLeaderBefore = false;
+		tryExecute(env, "One-to-one exactly once test");
+
+		// start a new broker:
+		brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
+
+		LOG.info("finished runBrokerFailureTest()");
 	}
 
 	// ------------------------------------------------------------------------
@@ -961,8 +894,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		LOG.info("Successfully read sequence for verification");
 	}
 
-	private static void writeSequence(StreamExecutionEnvironment env, String topicName,
-									  final int numElements, int parallelism) throws Exception {
+	private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
 
 		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
 
@@ -987,9 +919,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}).setParallelism(parallelism);
 		
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
-				topicName,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
+		stream.addSink(new FlinkKafkaProducer<>(topicName,
+				new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
 				new Tuple2Partitioner(parallelism)
 		)).setParallelism(parallelism);
 
@@ -1038,8 +970,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	private static void printTopic(String topicName, ConsumerConfig config,
-								   DeserializationSchema<?> deserializationSchema,
-								   int stopAfter) {
+								DeserializationSchema<?> deserializationSchema,
+								int stopAfter) {
 
 		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
 		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 0287392..e88d35a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -23,7 +23,6 @@ import org.junit.Test;
 
 import java.util.Properties;
 
-
 public class KafkaITCase extends KafkaConsumerTestBase {
 	
 	@Override
@@ -36,65 +35,65 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 	// ------------------------------------------------------------------------
 	
 	@Test
-	public void testCheckpointing() {
+	public void testCheckpointing() throws Exception {
 		runCheckpointingTest();
 	}
 
 	@Test
-	public void testOffsetInZookeeper() {
+	public void testOffsetInZookeeper() throws Exception {
 		runOffsetInZookeeperValidationTest();
 	}
 	
 	@Test
-	public void testConcurrentProducerConsumerTopology() {
+	public void testConcurrentProducerConsumerTopology() throws Exception {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
 	// --- canceling / failures ---
 	
 	@Test
-	public void testCancelingEmptyTopic() {
+	public void testCancelingEmptyTopic() throws Exception {
 		runCancelingOnEmptyInputTest();
 	}
 
 	@Test
-	public void testCancelingFullTopic() {
+	public void testCancelingFullTopic() throws Exception {
 		runCancelingOnFullInputTest();
 	}
 
 	@Test
-	public void testFailOnDeploy() {
+	public void testFailOnDeploy() throws Exception {
 		runFailOnDeployTest();
 	}
 
 	// --- source to partition mappings and exactly once ---
 	
 	@Test
-	public void testOneToOneSources() {
+	public void testOneToOneSources() throws Exception {
 		runOneToOneExactlyOnceTest();
 	}
 
 	@Test
-	public void testOneSourceMultiplePartitions() {
+	public void testOneSourceMultiplePartitions() throws Exception {
 		runOneSourceMultiplePartitionsExactlyOnceTest();
 	}
 
 	@Test
-	public void testMultipleSourcesOnePartition() {
+	public void testMultipleSourcesOnePartition() throws Exception {
 		runMultipleSourcesOnePartitionExactlyOnceTest();
 	}
 
 	// --- broker failure ---
 
 	@Test
-	public void testBrokerFailure() {
+	public void testBrokerFailure() throws Exception {
 		runBrokerFailureTest();
 	}
 
 	// --- special executions ---
 	
 	@Test
-	public void testBigRecordJob() {
+	public void testBigRecordJob() throws Exception {
 		runBigRecordTestTopology();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
index 5903844..5001364 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
@@ -26,11 +26,14 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
 import org.junit.Test;
 
+import java.io.Serializable;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -100,8 +103,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
 			.setParallelism(1);
 			
 			// sink partitions into 
-			stream.addSink(new KafkaSink<Tuple2<Long, String>>(
-					brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
+			stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(parallelism)))
 			.setParallelism(parallelism);
 
 			// ------ consuming topology ---------
@@ -165,7 +167,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
 	
 	// ------------------------------------------------------------------------
 
-	public static class CustomPartitioner implements SerializableKafkaPartitioner {
+	public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
 
 		private final int expectedPartitions;