You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/26 21:13:53 UTC

[03/10] flink git commit: [FLINK-2386] [kafka] Add new Kafka Consumer for Flink

http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
new file mode 100644
index 0000000..2b92ebf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTestBase.java
@@ -0,0 +1,1137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.server.KafkaServer;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.connectors.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.testutils.DataGenerators;
+import org.apache.flink.streaming.connectors.testutils.DiscardingSink;
+import org.apache.flink.streaming.connectors.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.connectors.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.connectors.testutils.PartitionValidatingMapper;
+import org.apache.flink.streaming.connectors.testutils.SuccessException;
+import org.apache.flink.streaming.connectors.testutils.ThrottledMapper;
+import org.apache.flink.streaming.connectors.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.testutils.ValidatingExactlyOnceSink;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+
+import scala.collection.Seq;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public abstract class KafkaConsumerTestBase extends KafkaTestBase {
+
+
+	// ------------------------------------------------------------------------
+	//  Required methods by the abstract test base
+	// ------------------------------------------------------------------------
+
+	protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
+			String topic, DeserializationSchema<T> deserializationSchema, Properties props);
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	//
+	//  The tests here are all not activated (by an @Test tag), but need
+	//  to be invoked from the extending classes. That way, the classes can
+	//  select which tests to run.
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test that validates that checkpointing and checkpoint notification works properly
+	 */
+	public void runCheckpointingTest() {
+		try {
+			createTestTopic("testCheckpointing", 1, 1);
+
+			FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
+			Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+			pendingCheckpointsField.setAccessible(true);
+			LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
+
+			Assert.assertEquals(0, pendingCheckpoints.size());
+			source.setRuntimeContext(new MockRuntimeContext(1, 0));
+
+			final long[] initialOffsets = new long[] { 1337 };
+
+			// first restore
+			source.restoreState(initialOffsets);
+
+			// then open
+			source.open(new Configuration());
+			long[] state1 = source.snapshotState(1, 15);
+
+			assertArrayEquals(initialOffsets, state1);
+
+			long[] state2 = source.snapshotState(2, 30);
+			Assert.assertArrayEquals(initialOffsets, state2);
+			Assert.assertEquals(2, pendingCheckpoints.size());
+
+			source.commitCheckpoint(1);
+			Assert.assertEquals(1, pendingCheckpoints.size());
+
+			source.commitCheckpoint(2);
+			Assert.assertEquals(0, pendingCheckpoints.size());
+
+			source.commitCheckpoint(666); // invalid checkpoint
+			Assert.assertEquals(0, pendingCheckpoints.size());
+
+			// create 500 snapshots
+			for (int i = 100; i < 600; i++) {
+				source.snapshotState(i, 15 * i);
+			}
+			Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+
+			// commit only the second last
+			source.commitCheckpoint(598);
+			Assert.assertEquals(1, pendingCheckpoints.size());
+
+			// access invalid checkpoint
+			source.commitCheckpoint(590);
+
+			// and the last
+			source.commitCheckpoint(599);
+			Assert.assertEquals(0, pendingCheckpoints.size());
+
+			source.close();
+
+			deleteTestTopic("testCheckpointing");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that offsets are properly committed to ZooKeeper and initial offsets are read from ZooKeeper.
+	 *
+	 * This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
+	 */
+	public void runOffsetInZookeeperValidationTest() {
+		try {
+			LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
+
+			final String topicName = "testOffsetHacking";
+			final int parallelism = 3;
+			
+			createTestTopic(topicName, parallelism, 1);
+
+			StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env1.getConfig().disableSysoutLogging();
+			env1.enableCheckpointing(50);
+			env1.setNumberOfExecutionRetries(0);
+			env1.setParallelism(parallelism);
+
+			StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env2.getConfig().disableSysoutLogging();
+			env2.enableCheckpointing(50);
+			env2.setNumberOfExecutionRetries(0);
+			env2.setParallelism(parallelism);
+
+			StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env3.getConfig().disableSysoutLogging();
+			env3.enableCheckpointing(50);
+			env3.setNumberOfExecutionRetries(0);
+			env3.setParallelism(parallelism);
+
+			// write a sequence from 0 to 99 to each of the 3 partitions.
+			writeSequence(env1, topicName, 100, parallelism);
+
+			readSequence(env2, standardProps, parallelism, topicName, 100, 0);
+
+			ZkClient zkClient = createZookeeperClient();
+			
+			long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
+			long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
+			long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
+
+			LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+
+			assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+			assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+			assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+
+			LOG.info("Manipulating offsets");
+
+			// set the offset to 50 for the three partitions
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
+
+			zkClient.close();
+			
+			// create new env
+			readSequence(env3, standardProps, parallelism, topicName, 50, 50);
+
+			deleteTestTopic(topicName);
+			
+			LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Ensure Kafka is working on both producer and consumer side.
+	 * This executes a job that contains two Flink pipelines.
+	 *
+	 * <pre>
+	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
+	 * </pre>
+	 */
+	public void runSimpleConcurrentProducerConsumerTopology() {
+		try {
+			LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
+
+			final String topic = "concurrentProducerConsumerTopic";
+			final int parallelism = 3;
+			final int elementsPerPartition = 100;
+			final int totalElements = parallelism * elementsPerPartition;
+
+			createTestTopic(topic, parallelism, 2);
+
+			final StreamExecutionEnvironment env =
+					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.setParallelism(parallelism);
+			env.setNumberOfExecutionRetries(0);
+			env.getConfig().disableSysoutLogging();
+
+			TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+
+			TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
+					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringType, env.getConfig());
+
+			TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
+					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringType, env.getConfig());
+
+			// ----------- add producer dataflow ----------
+
+			DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+
+				private boolean running = true;
+
+				@Override
+				public void run(SourceContext<Tuple2<Long, String>> ctx) {
+					int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+					int limit = cnt + elementsPerPartition;
+
+
+					while (running && cnt < limit) {
+						ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
+						cnt++;
+					}
+				}
+
+				@Override
+				public void cancel() {
+					running = false;
+				}
+			});
+			stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
+
+			// ----------- add consumer dataflow ----------
+
+			FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
+
+			DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
+
+			consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+
+				private int elCnt = 0;
+				private BitSet validator = new BitSet(totalElements);
+
+				@Override
+				public void invoke(Tuple2<Long, String> value) throws Exception {
+					String[] sp = value.f1.split("-");
+					int v = Integer.parseInt(sp[1]);
+
+					assertEquals(value.f0 - 1000, (long) v);
+
+					assertFalse("Received tuple twice", validator.get(v));
+					validator.set(v);
+					elCnt++;
+
+					if (elCnt == totalElements) {
+						// check if everything in the bitset is set to true
+						int nc;
+						if ((nc = validator.nextClearBit(0)) != totalElements) {
+							fail("The bitset was not set to 1 on all elements. Next clear:"
+									+ nc + " Set: " + validator);
+						}
+						throw new SuccessException();
+					}
+				}
+
+				@Override
+				public void close() throws Exception {
+					super.close();
+				}
+			}).setParallelism(1);
+
+			tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
+
+			LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
+
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
+	 * Flink sources.
+	 */
+	public void runOneToOneExactlyOnceTest() {
+		try {
+			LOG.info("Starting runOneToOneExactlyOnceTest()");
+
+			final String topic = "oneToOneTopic";
+			final int parallelism = 5;
+			final int numElementsPerPartition = 1000;
+			final int totalElements = parallelism * numElementsPerPartition;
+			final int failAfterElements = numElementsPerPartition / 3;
+			
+			createTestTopic(topic, parallelism, 1);
+			
+			DataGenerators.generateRandomizedIntegerSequence(
+					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+					brokerConnectionStrings, 
+					topic, parallelism, numElementsPerPartition, true);
+			
+			// run the topology that fails and recovers
+
+			DeserializationSchema<Integer> schema = 
+					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+			
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.enableCheckpointing(500);
+			env.setParallelism(parallelism);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+			
+			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+			
+			env
+					.addSource(kafkaSource)
+					.map(new PartitionValidatingMapper(parallelism, 1))
+					.map(new FailingIdentityMapper<Integer>(failAfterElements))
+					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+			FailingIdentityMapper.failedBefore = false;
+			tryExecute(env, "One-to-one exactly once test");
+
+			// this cannot be reliably checked, as checkpoints come in time intervals, and
+			// failures after a number of elements
+//			assertTrue("Job did not do a checkpoint before the failure",
+//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
+			
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
+	 * one Flink source will read multiple Kafka partitions.
+	 */
+	public void runOneSourceMultiplePartitionsExactlyOnceTest() {
+		try {
+			LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
+
+			final String topic = "oneToManyTopic";
+			final int numPartitions = 5;
+			final int numElementsPerPartition = 1000;
+			final int totalElements = numPartitions * numElementsPerPartition;
+			final int failAfterElements = numElementsPerPartition / 3;
+			
+			final int parallelism = 2;
+
+			createTestTopic(topic, numPartitions, 1);
+
+			DataGenerators.generateRandomizedIntegerSequence(
+					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+					brokerConnectionStrings,
+					topic, numPartitions, numElementsPerPartition, true);
+
+			// run the topology that fails and recovers
+
+			DeserializationSchema<Integer> schema =
+					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.enableCheckpointing(500);
+			env.setParallelism(parallelism);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+
+			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+			env
+					.addSource(kafkaSource)
+					.map(new PartitionValidatingMapper(numPartitions, 3))
+					.map(new FailingIdentityMapper<Integer>(failAfterElements))
+					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+			FailingIdentityMapper.failedBefore = false;
+			tryExecute(env, "One-source-multi-partitions exactly once test");
+
+			// this cannot be reliably checked, as checkpoints come in time intervals, and
+			// failures after a number of elements
+//			assertTrue("Job did not do a checkpoint before the failure",
+//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
+			
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests the proper consumption when having more Flink sources than Kafka partitions, which means
+	 * that some Flink sources will read no partitions.
+	 */
+	public void runMultipleSourcesOnePartitionExactlyOnceTest() {
+		try {
+			LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
+
+			final String topic = "manyToOneTopic";
+			final int numPartitions = 5;
+			final int numElementsPerPartition = 1000;
+			final int totalElements = numPartitions * numElementsPerPartition;
+			final int failAfterElements = numElementsPerPartition / 3;
+
+			final int parallelism = 8;
+
+			createTestTopic(topic, numPartitions, 1);
+
+			DataGenerators.generateRandomizedIntegerSequence(
+					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+					brokerConnectionStrings,
+					topic, numPartitions, numElementsPerPartition, true);
+
+			// run the topology that fails and recovers
+			
+			DeserializationSchema<Integer> schema =
+					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.enableCheckpointing(500);
+			env.setParallelism(parallelism);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+			env.setBufferTimeout(0);
+
+			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+			
+			env
+					.addSource(kafkaSource)
+					.map(new PartitionValidatingMapper(numPartitions, 1))
+					.map(new FailingIdentityMapper<Integer>(failAfterElements))
+					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+			
+			FailingIdentityMapper.failedBefore = false;
+			tryExecute(env, "multi-source-one-partitions exactly once test");
+
+			// this cannot be reliably checked, as checkpoints come in time intervals, and
+			// failures after a number of elements
+//			assertTrue("Job did not do a checkpoint before the failure",
+//					FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
+			
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	/**
+	 * Tests that the source can be properly canceled when reading full partitions. 
+	 */
+	public void runCancelingOnFullInputTest() {
+		try {
+			final String topic = "cancelingOnFullTopic";
+
+			final int parallelism = 3;
+			createTestTopic(topic, parallelism, 1);
+
+			// launch a producer thread
+			DataGenerators.InfiniteStringsGenerator generator =
+					new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
+			generator.start();
+
+			// launch a consumer asynchronously
+
+			final AtomicReference<Throwable> jobError = new AtomicReference<Throwable>();
+
+			final Runnable jobRunner = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+						env.setParallelism(parallelism);
+						env.enableCheckpointing(100);
+						env.getConfig().disableSysoutLogging();
+
+						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+
+						env.addSource(source).addSink(new DiscardingSink<String>());
+
+						env.execute();
+					}
+					catch (Throwable t) {
+						jobError.set(t);
+					}
+				}
+			};
+
+			Thread runnerThread = new Thread(jobRunner, "program runner thread");
+			runnerThread.start();
+
+			// wait a bit before canceling
+			Thread.sleep(2000);
+
+			// cancel
+			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManager());
+
+			// wait for the program to be done and validate that we failed with the right exception
+			runnerThread.join();
+
+			Throwable failueCause = jobError.get();
+			assertNotNull("program did not fail properly due to canceling", failueCause);
+			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+			if (generator.isAlive()) {
+				generator.shutdown();
+				generator.join();
+			}
+			else {
+				Throwable t = generator.getError();
+				if (t != null) {
+					t.printStackTrace();
+					fail("Generator failed: " + t.getMessage());
+				} else {
+					fail("Generator failed with no exception");
+				}
+			}
+
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that the source can be properly canceled when reading empty partitions. 
+	 */
+	public void runCancelingOnEmptyInputTest() {
+		try {
+			final String topic = "cancelingOnEmptyInputTopic";
+
+			final int parallelism = 3;
+			createTestTopic(topic, parallelism, 1);
+
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+			final Runnable jobRunner = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+						env.setParallelism(parallelism);
+						env.enableCheckpointing(100);
+						env.getConfig().disableSysoutLogging();
+
+						FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+
+						env.addSource(source).addSink(new DiscardingSink<String>());
+
+						env.execute();
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+
+			Thread runnerThread = new Thread(jobRunner, "program runner thread");
+			runnerThread.start();
+
+			// wait a bit before canceling
+			Thread.sleep(2000);
+
+			// cancel
+			JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManager());
+
+			// wait for the program to be done and validate that we failed with the right exception
+			runnerThread.join();
+
+			Throwable failueCause = error.get();
+			assertNotNull("program did not fail properly due to canceling", failueCause);
+			assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Tests that the source can be properly canceled when reading full partitions. 
+	 */
+	public void runFailOnDeployTest() {
+		try {
+			final String topic = "failOnDeployTopic";
+			
+			createTestTopic(topic, 2, 1);
+
+			DeserializationSchema<Integer> schema =
+					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.setParallelism(12); // needs to be more that the mini cluster has slots
+			env.getConfig().disableSysoutLogging();
+
+			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+			
+			env
+					.addSource(kafkaSource)
+					.addSink(new DiscardingSink<Integer>());
+			
+			try {
+				env.execute();
+				fail("this test should fail with an exception");
+			}
+			catch (ProgramInvocationException e) {
+				
+				// validate that we failed due to a NoResourceAvailableException
+				Throwable cause = e.getCause();
+				int depth = 0;
+				boolean foundResourceException = false;
+				
+				while (cause != null && depth++ < 20) {
+					if (cause instanceof NoResourceAvailableException) {
+						foundResourceException = true;
+						break;
+					}
+					cause = cause.getCause();
+				}
+				
+				assertTrue("Wrong exception", foundResourceException);
+			}
+
+			deleteTestTopic(topic);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Test Flink's Kafka integration also with very big records (30MB)
+	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+	 */
+	public void runBigRecordTestTopology() {
+		try {
+			LOG.info("Starting runBigRecordTestTopology()");
+
+			final String topic = "bigRecordTestTopic";
+			final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
+			
+			createTestTopic(topic, parallelism, 1);
+
+			final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+
+			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
+					new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo, new ExecutionConfig());
+
+			final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
+					new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo, new ExecutionConfig());
+
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.setNumberOfExecutionRetries(0);
+			env.getConfig().disableSysoutLogging();
+			env.enableCheckpointing(100);
+			env.setParallelism(parallelism);
+
+			// add consuming topology:
+			Properties consumerProps = new Properties();
+			consumerProps.putAll(standardProps);
+			consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
+			consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
+			consumerProps.setProperty("queued.max.message.chunks", "1");
+
+			FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
+			DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
+
+			consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+
+				private int elCnt = 0;
+
+				@Override
+				public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+					elCnt++;
+					if (value.f0 == -1) {
+						// we should have seen 11 elements now.
+						if(elCnt == 11) {
+							throw new SuccessException();
+						} else {
+							throw new RuntimeException("There have been "+elCnt+" elements");
+						}
+					}
+					if(elCnt > 10) {
+						throw new RuntimeException("More than 10 elements seen: "+elCnt);
+					}
+				}
+			});
+
+			// add producing topology
+			Properties producerProps = new Properties();
+			producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
+			
+			DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+
+				private boolean running;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					running = true;
+				}
+
+				@Override
+				public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
+					Random rnd = new Random();
+					long cnt = 0;
+					int fifteenMb = 1024 * 1024 * 15;
+
+					while (running) {
+						byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
+						ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+
+						Thread.sleep(100);
+
+						if (cnt == 10) {
+							// signal end
+							ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+							break;
+						}
+					}
+				}
+
+				@Override
+				public void cancel() {
+					running = false;
+				}
+			});
+
+			stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
+					producerProps, deserSchema));
+
+			tryExecute(env, "big topology test");
+
+			deleteTestTopic(topic);
+			
+			LOG.info("Finished runBigRecordTestTopology()");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	
+	public void runBrokerFailureTest() {
+		try {
+			LOG.info("starting runBrokerFailureTest()");
+			
+			final String topic = "brokerFailureTestTopic";
+
+			final int parallelism = 2;
+			final int numElementsPerPartition = 1000;
+			final int totalElements = parallelism * numElementsPerPartition;
+			final int failAfterElements = numElementsPerPartition / 3;
+			
+
+			createTestTopic(topic, parallelism, 2);
+
+			DataGenerators.generateRandomizedIntegerSequence(
+					StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+					brokerConnectionStrings,
+					topic, parallelism, numElementsPerPartition, true);
+
+			// find leader to shut down
+			ZkClient zkClient = createZookeeperClient();
+			PartitionMetadata firstPart = null;
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
+
+				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+				firstPart = partitionMetadata.head();
+			}
+			while (firstPart.errorCode() != 0);
+			zkClient.close();
+
+			final String leaderToShutDown = firstPart.leader().get().connectionString();
+			LOG.info("Leader to shutdown {}", leaderToShutDown);
+			
+			
+			// run the topology that fails and recovers
+
+			DeserializationSchema<Integer> schema =
+					new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.setParallelism(parallelism);
+			env.enableCheckpointing(500);
+			env.setNumberOfExecutionRetries(3);
+			env.getConfig().disableSysoutLogging();
+			
+
+			FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+			env
+					.addSource(kafkaSource)
+					.map(new PartitionValidatingMapper(parallelism, 1))
+					.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+					.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+			BrokerKillingMapper.killedLeaderBefore = false;
+			tryExecute(env, "One-to-one exactly once test");
+
+			// this cannot be reliably checked, as checkpoints come in time intervals, and
+			// failures after a number of elements
+//			assertTrue("Job did not do a checkpoint before the failure",
+//					BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
+
+			LOG.info("finished runBrokerFailureTest()");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Reading writing test data sets
+	// ------------------------------------------------------------------------
+
+	private void readSequence(StreamExecutionEnvironment env, Properties cc,
+								final int sourceParallelism,
+								final String topicName,
+								final int valuesCount, final int startFrom) throws Exception {
+
+		final int finalCount = valuesCount * sourceParallelism;
+
+		final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+		final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
+				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(intIntTupleType, env.getConfig());
+
+		// create the consumer
+		FlinkKafkaConsumer<Tuple2<Integer, Integer>> consumer = getConsumer(topicName, deser, cc);
+
+		DataStream<Tuple2<Integer, Integer>> source = env
+				.addSource(consumer).setParallelism(sourceParallelism)
+				.map(new ThrottledMapper<Tuple2<Integer, Integer>>(20)).setParallelism(sourceParallelism);
+
+		// verify data
+		source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+
+			private int[] values = new int[valuesCount];
+			private int count = 0;
+
+			@Override
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+				values[value.f1 - startFrom]++;
+				count++;
+
+				// verify if we've seen everything
+				if (count == finalCount) {
+					for (int i = 0; i < values.length; i++) {
+						int v = values[i];
+						if (v != sourceParallelism) {
+							printTopic(topicName, valuesCount, deser);
+							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+						}
+					}
+					// test has passed
+					throw new SuccessException();
+				}
+			}
+
+		}).setParallelism(1);
+
+		tryExecute(env, "Read data from Kafka");
+
+		LOG.info("Successfully read sequence for verification");
+	}
+
+	private static void writeSequence(StreamExecutionEnvironment env, String topicName,
+									  final int numElements, int parallelism) throws Exception {
+
+		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+			private boolean running = true;
+
+			@Override
+			public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+				int cnt = 0;
+				int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+				while (running && cnt < numElements) {
+					ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+					cnt++;
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		}).setParallelism(parallelism);
+		
+		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
+				topicName,
+				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
+				new Tuple2Partitioner(parallelism)
+		)).setParallelism(parallelism);
+
+		env.execute("Write sequence");
+
+		LOG.info("Finished writing sequence");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Debugging utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Read topic to list, only using Kafka code.
+	 */
+	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
+		// will see each message only once.
+		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+		if(streams.size() != 1) {
+			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+		}
+		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+		if(kafkaStreams == null) {
+			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+		}
+		if(kafkaStreams.size() != 1) {
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+		}
+		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+		int read = 0;
+		while(iteratorToRead.hasNext()) {
+			read++;
+			result.add(iteratorToRead.next());
+			if(read == stopAfter) {
+				LOG.info("Read "+read+" elements");
+				return result;
+			}
+		}
+		return result;
+	}
+
+	private static void printTopic(String topicName, ConsumerConfig config,
+								   DeserializationSchema<?> deserializationSchema,
+								   int stopAfter) {
+
+		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+
+		for (MessageAndMetadata<byte[], byte[]> message: contents) {
+			Object out = deserializationSchema.deserialize(message.message());
+			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+		}
+	}
+
+	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
+		// write the sequence to log for debugging purposes
+		Properties stdProps = standardCC.props().props();
+		Properties newProps = new Properties(stdProps);
+		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
+		newProps.setProperty("auto.offset.reset", "smallest");
+		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+		printTopic(topicName, printerConfig, deserializer, elements);
+	}
+
+
+	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
+			implements Checkpointed<Integer>, CheckpointCommitter {
+
+		private static final long serialVersionUID = 6334389850158707313L;
+
+		public static volatile boolean killedLeaderBefore;
+		public static volatile boolean hasBeenCheckpointedBeforeFailure;
+		
+		private final String leaderToShutDown;
+		private final int failCount;
+		private int numElementsTotal;
+
+		private boolean failer;
+		private boolean hasBeenCheckpointed;
+
+
+		public BrokerKillingMapper(String leaderToShutDown, int failCount) {
+			this.leaderToShutDown = leaderToShutDown;
+			this.failCount = failCount;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		}
+
+		@Override
+		public T map(T value) throws Exception {
+			numElementsTotal++;
+			
+			if (!killedLeaderBefore) {
+				Thread.sleep(10);
+				
+				if (failer && numElementsTotal >= failCount) {
+					// shut down a Kafka broker
+					KafkaServer toShutDown = null;
+					for (KafkaServer kafkaServer : brokers) {
+						if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+							toShutDown = kafkaServer;
+							break;
+						}
+					}
+	
+					if (toShutDown == null) {
+						throw new Exception("Cannot find broker to shut down");
+					}
+					else {
+						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+						killedLeaderBefore = true;
+						toShutDown.shutdown();
+					}
+				}
+			}
+			return value;
+		}
+
+		@Override
+		public void commitCheckpoint(long checkpointId) {
+			hasBeenCheckpointed = true;
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return numElementsTotal;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			this.numElementsTotal = state;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java
new file mode 100644
index 0000000..54ce4ae
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+
+public class KafkaITCase extends KafkaConsumerTestBase {
+	
+	@Override
+	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
+		return new FlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testCheckpointing() {
+		runCheckpointingTest();
+	}
+
+	@Test
+	public void testOffsetInZookeeper() {
+		runOffsetInZookeeperValidationTest();
+	}
+	
+	@Test
+	public void testConcurrentProducerConsumerTopology() {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+	// --- canceling / failures ---
+	
+	@Test
+	public void testCancelingEmptyTopic() {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test
+	public void testCancelingFullTopic() {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test
+	public void testFailOnDeploy() {
+		runFailOnDeployTest();
+	}
+
+	// --- source to partition mappings and exactly once ---
+	
+	@Test
+	public void testOneToOneSources() {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test
+	public void testOneSourceMultiplePartitions() {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test
+	public void testMultipleSourcesOnePartition() {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test
+	public void testBrokerFailure() {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+	
+	@Test
+	public void testBigRecordJob() {
+		runBigRecordTestTopology();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..b910b54
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import kafka.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaLocalSystemTime implements Time {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+	@Override
+	public long milliseconds() {
+		return System.currentTimeMillis();
+	}
+
+	@Override
+	public long nanoseconds() {
+		return System.nanoTime();
+	}
+
+	@Override
+	public void sleep(long ms) {
+		try {
+			Thread.sleep(ms);
+		} catch (InterruptedException e) {
+			LOG.warn("Interruption", e);
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
new file mode 100644
index 0000000..2f14fef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaProducerITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.testutils.SuccessException;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class KafkaProducerITCase extends KafkaTestBase {
+
+
+	/**
+	 * 
+	 * <pre>
+	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
+	 *            /                  |                                       \
+	 *           /                   |                                        \
+	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
+	 *           \                   |                                        /
+	 *            \                  |                                       /
+	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+	 * </pre>
+	 * 
+	 * The mapper validates that the values come consistently from the correct Kafka partition.
+	 * 
+	 * The final sink validates that there are no duplicates and that all partitions are present.
+	 */
+	@Test
+	public void testCustomPartitioning() {
+		try {
+			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
+
+			final String topic = "customPartitioningTestTopic";
+			final int parallelism = 3;
+			
+			createTestTopic(topic, parallelism, 1);
+
+			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
+
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			env.setNumberOfExecutionRetries(0);
+			env.getConfig().disableSysoutLogging();
+
+			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
+					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig());
+
+			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
+					new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo, env.getConfig());
+
+			// ------ producing topology ---------
+			
+			// source has DOP 1 to make sure it generates no duplicates
+			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+
+				private boolean running = true;
+
+				@Override
+				public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
+					long cnt = 0;
+					while (running) {
+						ctx.collect(new Tuple2<Long, String>(cnt, "kafka-" + cnt));
+						cnt++;
+					}
+				}
+
+				@Override
+				public void cancel() {
+					running = false;
+				}
+			})
+			.setParallelism(1);
+			
+			// sink partitions into 
+			stream.addSink(new KafkaSink<Tuple2<Long, String>>(
+					brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
+			.setParallelism(parallelism);
+
+			// ------ consuming topology ---------
+			
+			FlinkKafkaConsumer<Tuple2<Long, String>> source = 
+					new FlinkKafkaConsumer<Tuple2<Long, String>>(topic, deserSchema, standardProps,
+							FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
+							FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
+			
+			env.addSource(source).setParallelism(parallelism)
+
+					// mapper that validates partitioning and maps to partition
+					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
+						
+						private int ourPartition = -1;
+						@Override
+						public Integer map(Tuple2<Long, String> value) {
+							int partition = value.f0.intValue() % parallelism;
+							if (ourPartition != -1) {
+								assertEquals("inconsistent partitioning", ourPartition, partition);
+							} else {
+								ourPartition = partition;
+							}
+							return partition;
+						}
+					}).setParallelism(parallelism)
+					
+					.addSink(new SinkFunction<Integer>() {
+						
+						private int[] valuesPerPartition = new int[parallelism];
+						
+						@Override
+						public void invoke(Integer value) throws Exception {
+							valuesPerPartition[value]++;
+							
+							boolean missing = false;
+							for (int i : valuesPerPartition) {
+								if (i < 100) {
+									missing = true;
+									break;
+								}
+							}
+							if (!missing) {
+								throw new SuccessException();
+							}
+						}
+					}).setParallelism(1);
+			
+			tryExecute(env, "custom partitioning test");
+
+			deleteTestTopic(topic);
+			
+			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	// ------------------------------------------------------------------------
+
+	public static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+		private final int expectedPartitions;
+
+		public CustomPartitioner(int expectedPartitions) {
+			this.expectedPartitions = expectedPartitions;
+		}
+
+		@Override
+		public int partition(Object key, int numPartitions) {
+			@SuppressWarnings("unchecked")
+			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+			
+			assertEquals(expectedPartitions, numPartitions);
+			
+			return (int) (tuple.f0 % numPartitions);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
new file mode 100644
index 0000000..e177497
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/KafkaTestBase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import kafka.admin.AdminUtils;
+import kafka.consumer.ConsumerConfig;
+import kafka.network.SocketServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.internals.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.connectors.testutils.SuccessException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * The base for the Kafka tests. It brings up:
+ * <ul>
+ *     <li>A ZooKeeper mini cluster</li>
+ *     <li>Three Kafka Brokers (mini clusters)</li>
+ *     <li>A Flink mini cluster</li>
+ * </ul>
+ * 
+ * <p>Code in this test is based on the following GitHub repository:
+ * <a href="https://github.com/sakserv/hadoop-mini-clusters">
+ *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
+ * as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
+ */
+@SuppressWarnings("serial")
+public abstract class KafkaTestBase {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
+	
+	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+	protected static String zookeeperConnectionString;
+
+	protected static File tmpZkDir;
+
+	protected static File tmpKafkaParent;
+
+	protected static TestingServer zookeeper;
+	protected static List<KafkaServer> brokers;
+	protected static String brokerConnectionStrings = "";
+
+	protected static ConsumerConfig standardCC;
+	protected static Properties standardProps;
+	
+	protected static ForkableFlinkMiniCluster flink;
+
+	protected static int flinkPort;
+	
+	
+	
+	// ------------------------------------------------------------------------
+	//  Setup and teardown of the mini clusters
+	// ------------------------------------------------------------------------
+	
+	@BeforeClass
+	public static void prepare() throws IOException {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting KafkaITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+		
+		LOG.info("Starting KafkaITCase.prepare()");
+		
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+		
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		List<File> tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
+		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		String kafkaHost = "localhost";
+		int zkPort = NetUtils.getAvailablePort();
+		zookeeperConnectionString = "localhost:" + zkPort;
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			LOG.info("Starting Zookeeper");
+			zookeeper = new TestingServer(zkPort, tmpZkDir);
+			
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
+			
+			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i), kafkaHost, zookeeperConnectionString));
+				SocketServer socketServer = brokers.get(i).socketServer();
+				
+				String host = socketServer.host() == null ? "localhost" : socketServer.host();
+				brokerConnectionStrings += host+":"+socketServer.port()+",";
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Test setup failed: " + t.getMessage());
+		}
+
+		standardProps = new Properties();
+
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionStrings);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("auto.commit.enable", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
+		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+		
+		Properties consumerConfigProps = new Properties();
+		consumerConfigProps.putAll(standardProps);
+		consumerConfigProps.setProperty("auto.offset.reset", "smallest");
+		standardCC = new ConsumerConfig(consumerConfigProps);
+		
+		// start also a re-usable Flink mini cluster
+		
+		Configuration flinkConfig = new Configuration();
+		flinkConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 s");
+
+		flink = new ForkableFlinkMiniCluster(flinkConfig, false, StreamingMode.STREAMING);
+		flinkPort = flink.getJobManagerRPCPort();
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Shut down KafkaITCase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		flinkPort = -1;
+		flink.shutdown();
+		
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		brokers.clear();
+		
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			}
+			catch (Exception e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+			zookeeper = null;
+		}
+		
+		// clean up the temp spaces
+		
+		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpKafkaParent);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+		if (tmpZkDir != null && tmpZkDir.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpZkDir);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    KafkaITCase finished"); 
+		LOG.info("-------------------------------------------------------------------------");
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 */
+	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
+												String kafkaHost,
+												String zookeeperConnectionString) throws Exception {
+		Properties kafkaProperties = new Properties();
+
+		int kafkaPort = NetUtils.getAvailablePort();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", kafkaHost);
+		kafkaProperties.put("port", Integer.toString(kafkaPort));
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", "" + (50 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", "" + (50 * 1024 * 1024));
+		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+		server.startup();
+		return server;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Execution utilities
+	// ------------------------------------------------------------------------
+	
+	protected ZkClient createZookeeperClient() {
+		return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+	}
+	
+	protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+		try {
+			see.execute(name);
+		} catch (Exception root) {
+			Throwable cause = root.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (!(cause instanceof SuccessException)) {
+				if (cause == null || depth++ == 20) {
+					root.printStackTrace();
+					fail("Test failed: " + root.getMessage());
+				} else {
+					cause = cause.getCause();
+				}
+			}
+		}
+	}
+
+	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+		
+		// create topic with one client
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topic);
+
+		ZkClient creator = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+		
+		AdminUtils.createTopic(creator, topic, numberOfPartitions, replicationFactor, topicConfig);
+		creator.close();
+		
+		// validate that the topic has been created
+		final long deadline = System.currentTimeMillis() + 30000;
+		do {
+			List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
+			if (partitions != null && partitions.size() > 0) {
+				return;
+			}
+		}
+		while (System.currentTimeMillis() < deadline);
+		fail ("Test topic could not be created");
+	}
+	
+	protected static void deleteTestTopic(String topic) {
+		LOG.info("Deleting topic {}", topic);
+
+		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+
+		AdminUtils.deleteTopic(zk, topic);
+		
+		zk.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/940a7c8a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
new file mode 100644
index 0000000..c412136
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandlerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.internals;
+
+import kafka.admin.AdminUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.flink.streaming.connectors.KafkaTestBase;
+
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
+	
+	@Test
+	public void runOffsetManipulationinZooKeeperTest() {
+		try {
+			final String topicName = "ZookeeperOffsetHandlerTest-Topic";
+			final String groupId = "ZookeeperOffsetHandlerTest-Group";
+			
+			final long offset = (long) (Math.random() * Long.MAX_VALUE);
+
+			ZkClient zkClient = createZookeeperClient();
+			AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
+				
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
+	
+			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId, topicName, 0);
+
+			zkClient.close();
+			
+			assertEquals(offset, fetchedOffset);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}