You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/26 21:13:56 UTC

[06/10] flink git commit: [FLINK-2386] [kafka] Move Kafka connectors to 'org.apache.flink.streaming.connectors.kafka'

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
new file mode 100644
index 0000000..68cc55c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import java.util.Random;
+
+@SuppressWarnings("serial")
+public class DataGenerators {
+	
+	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
+														String brokerConnection, String topic,
+														int numPartitions,
+														final int from, final int to) throws Exception {
+
+		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+
+		env.setParallelism(numPartitions);
+		env.getConfig().disableSysoutLogging();
+		env.setNumberOfExecutionRetries(0);
+		
+		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
+				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+
+					private volatile boolean running = true;
+
+					@Override
+					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+						int cnt = from;
+						int partition = getRuntimeContext().getIndexOfThisSubtask();
+
+						while (running && cnt <= to) {
+							ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+							cnt++;
+						}
+					}
+
+					@Override
+					public void cancel() {
+						running = false;
+					}
+				});
+
+		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
+				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
+				new Tuple2Partitioner(numPartitions)
+		));
+
+		env.execute("Data generator (Int, Int) stream to topic " + topic);
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
+															String brokerConnection, String topic,
+															final int numPartitions,
+															final int numElements,
+															final boolean randomizeOrder) throws Exception {
+		env.setParallelism(numPartitions);
+		env.getConfig().disableSysoutLogging();
+		env.setNumberOfExecutionRetries(0);
+
+		DataStream<Integer> stream = env.addSource(
+				new RichParallelSourceFunction<Integer>() {
+
+					private volatile boolean running = true;
+
+					@Override
+					public void run(SourceContext<Integer> ctx) {
+						// create a sequence
+						int[] elements = new int[numElements];
+						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
+								i < numElements;
+								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+							
+							elements[i] = val;
+						}
+
+						// scramble the sequence
+						if (randomizeOrder) {
+							Random rnd = new Random();
+							for (int i = 0; i < elements.length; i++) {
+								int otherPos = rnd.nextInt(elements.length);
+								
+								int tmp = elements[i];
+								elements[i] = elements[otherPos];
+								elements[otherPos] = tmp;
+							}
+						}
+
+						// emit the sequence
+						int pos = 0;
+						while (running && pos < elements.length) {
+							ctx.collect(elements[pos++]);
+						}
+					}
+
+					@Override
+					public void cancel() {
+						running = false;
+					}
+				});
+
+		stream
+				.rebalance()
+				.addSink(new KafkaSink<Integer>(brokerConnection, topic,
+						new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
+						new SerializableKafkaPartitioner() {
+							@Override
+							public int partition(Object key, int numPartitions) {
+								return ((Integer) key) % numPartitions;
+							}
+						}));
+
+		env.execute("Scrambles int sequence generator");
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	public static class InfiniteStringsGenerator extends Thread {
+
+		private final String kafkaConnectionString;
+		
+		private final String topic;
+		
+		private volatile Throwable error;
+		
+		private volatile boolean running = true;
+
+		
+		public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
+			this.kafkaConnectionString = kafkaConnectionString;
+			this.topic = topic;
+		}
+
+		@Override
+		public void run() {
+			// we manually feed data into the Kafka sink
+			KafkaSink<String> producer = null;
+			try {
+				producer = new KafkaSink<String>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+				producer.open(new Configuration());
+				
+				final StringBuilder bld = new StringBuilder();
+				final Random rnd = new Random();
+				
+				while (running) {
+					bld.setLength(0);
+					
+					int len = rnd.nextInt(100) + 1;
+					for (int i = 0; i < len; i++) {
+						bld.append((char) (rnd.nextInt(20) + 'a') );
+					}
+					
+					String next = bld.toString();
+					producer.invoke(next);
+				}
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+			finally {
+				if (producer != null) {
+					try {
+						producer.close();
+					}
+					catch (Throwable t) {
+						// ignore
+					}
+				}
+			}
+		}
+		
+		public void shutdown() {
+			this.running = false;
+			this.interrupt();
+		}
+		
+		public Throwable getError() {
+			return this.error;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
new file mode 100644
index 0000000..987e6c5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DiscardingSink.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+/**
+ * Sink function that discards data.
+ * @param <T> The type of the function.
+ */
+public class DiscardingSink<T> implements SinkFunction<T> {
+
+	private static final long serialVersionUID = 2777597566520109843L;
+
+	@Override
+	public void invoke(T value) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
new file mode 100644
index 0000000..48e06e3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+		Checkpointed<Integer>, CheckpointCommitter, Runnable {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
+	
+	private static final long serialVersionUID = 6334389850158707313L;
+	
+	public static volatile boolean failedBefore;
+	public static volatile boolean hasBeenCheckpointedBeforeFailure;
+
+	private final int failCount;
+	private int numElementsTotal;
+	private int numElementsThisTime;
+	
+	private boolean failer;
+	private boolean hasBeenCheckpointed;
+	
+	private Thread printer;
+	private volatile boolean printerRunning = true;
+
+	public FailingIdentityMapper(int failCount) {
+		this.failCount = failCount;
+	}
+
+	@Override
+	public void open(Configuration parameters) {
+		failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		printer = new Thread(this, "FailingIdentityMapper Status Printer");
+		printer.start();
+	}
+
+	@Override
+	public T map(T value) throws Exception {
+		numElementsTotal++;
+		numElementsThisTime++;
+		
+		if (!failedBefore) {
+			Thread.sleep(10);
+			
+			if (failer && numElementsTotal >= failCount) {
+				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+				failedBefore = true;
+				throw new Exception("Artificial Test Failure");
+			}
+		}
+		return value;
+	}
+
+	@Override
+	public void close() throws Exception {
+		printerRunning = false;
+		if (printer != null) {
+			printer.interrupt();
+			printer = null;
+		}
+	}
+
+	@Override
+	public void commitCheckpoint(long checkpointId) {
+		this.hasBeenCheckpointed = true;
+	}
+
+	@Override
+	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+		return numElementsTotal;
+	}
+
+	@Override
+	public void restoreState(Integer state) {
+		numElementsTotal = state;
+	}
+
+	@Override
+	public void run() {
+		while (printerRunning) {
+			try {
+				Thread.sleep(5000);
+			}
+			catch (InterruptedException e) {
+				// ignore
+			}
+			LOG.info("============================> Failing mapper  {}: count={}, totalCount={}",
+					getRuntimeContext().getIndexOfThisSubtask(),
+					numElementsThisTime, numElementsTotal);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
new file mode 100644
index 0000000..59c607f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerCommunicationUtils {
+	
+	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+	
+	
+	public static void cancelCurrentJob(ActorRef jobManager) throws Exception {
+		
+		// find the jobID
+
+		Future<Object> listResponse = Patterns.ask(jobManager,
+				JobManagerMessages.getRequestRunningJobsStatus(),
+				askTimeout.toMillis());
+
+		List<JobStatusMessage> jobs;
+		try {
+			Object result = Await.result(listResponse, askTimeout);
+			jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+		}
+		catch (Exception e) {
+			throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
+		}
+		
+		if (jobs.isEmpty()) {
+			throw new Exception("Could not cancel job - no running jobs");
+		}
+		if (jobs.size() != 1) {
+			throw new Exception("Could not cancel job - more than one running job.");
+		}
+		
+		JobStatusMessage status = jobs.get(0);
+		if (status.getJobState().isTerminalState()) {
+			throw new Exception("Could not cancel job - job is not running any more");
+		}
+		
+		JobID jobId = status.getJobId();
+		
+		Future<Object> response = Patterns.ask(jobManager,new JobManagerMessages.CancelJob(jobId),
+				askTimeout.toMillis());
+		try {
+			Await.result(response, askTimeout);
+		}
+		catch (Exception e) {
+			throw new Exception("Sending the 'cancel' message failed.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
new file mode 100644
index 0000000..f8a5da1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+public class MockRuntimeContext implements RuntimeContext {
+
+	private final int numberOfParallelSubtasks;
+	private final int indexOfThisSubtask;
+
+	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
+		this.indexOfThisSubtask = indexOfThisSubtask;
+	}
+
+
+	@Override
+	public String getTaskName() {
+		return null;
+	}
+
+	@Override
+	public int getNumberOfParallelSubtasks() {
+		return numberOfParallelSubtasks;
+	}
+
+	@Override
+	public int getIndexOfThisSubtask() {
+		return indexOfThisSubtask;
+	}
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return null;
+	}
+
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return null;
+	}
+
+	@Override
+	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
+
+	@Override
+	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
+		return null;
+	}
+
+	@Override
+	public HashMap<String, Accumulator<?, ?>> getAllAccumulators() { return null; }
+
+	@Override
+	public IntCounter getIntCounter(String name) {
+		return null;
+	}
+
+	@Override
+	public LongCounter getLongCounter(String name) {
+		return null;
+	}
+
+	@Override
+	public DoubleCounter getDoubleCounter(String name) {
+		return null;
+	}
+
+	@Override
+	public Histogram getHistogram(String name) {
+		return null;
+	}
+
+	@Override
+	public <RT> List<RT> getBroadcastVariable(String name) {
+		return null;
+	}
+
+	@Override
+	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+		return null;
+	}
+
+	@Override
+	public DistributedCache getDistributedCache() {
+		return null;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
new file mode 100644
index 0000000..9f516c0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
+
+	private static final long serialVersionUID = 1088381231244959088L;
+	
+	/* the partitions from which this function received data */
+	private final Set<Integer> myPartitions = new HashSet<Integer>();
+	
+	private final int numPartitions;
+	private final int maxPartitions;
+
+	public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
+		this.numPartitions = numPartitions;
+		this.maxPartitions = maxPartitions;
+	}
+
+	@Override
+	public Integer map(Integer value) throws Exception {
+		// validate that the partitioning is identical
+		int partition = value % numPartitions;
+		myPartitions.add(partition);
+		if (myPartitions.size() > maxPartitions) {
+			throw new Exception("Error: Elements from too many different partitions: " + myPartitions
+					+ ". Expect elements only from " + maxPartitions + " partitions");
+		}
+		return value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
new file mode 100644
index 0000000..12e3460
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/SuccessException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+/**
+ * Exception that is thrown to terminate a program and indicate success.
+ */
+public class SuccessException extends Exception {
+	private static final long serialVersionUID = -7011865671593955887L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
new file mode 100644
index 0000000..1d61229
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * An identity map function that sleeps between elements, throttling the
+ * processing speed.
+ * 
+ * @param <T> The type mapped.
+ */
+public class ThrottledMapper<T> implements MapFunction<T,T> {
+
+	private static final long serialVersionUID = 467008933767159126L;
+
+	private final int sleep;
+
+	public ThrottledMapper(int sleep) {
+		this.sleep = sleep;
+	}
+
+	@Override
+	public T map(T value) throws Exception {
+		Thread.sleep(this.sleep);
+		return value;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
new file mode 100644
index 0000000..6ffaac4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2Partitioner implements SerializableKafkaPartitioner {
+	
+	private static final long serialVersionUID = 1L;
+
+	private final int expectedPartitions;
+
+	
+	public Tuple2Partitioner(int expectedPartitions) {
+		this.expectedPartitions = expectedPartitions;
+	}
+
+	@Override
+	public int partition(Object key, int numPartitions) {
+		if (numPartitions != expectedPartitions) {
+			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+		}
+		@SuppressWarnings("unchecked")
+		Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+		
+		return element.f0;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..1d93c9e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
+
+	private static final long serialVersionUID = 1748426382527469932L;
+	
+	private final int numElementsTotal;
+	
+	private BitSet duplicateChecker = new BitSet();  // this is checkpointed
+
+	private int numElements; // this is checkpointed
+
+	
+	public ValidatingExactlyOnceSink(int numElementsTotal) {
+		this.numElementsTotal = numElementsTotal;
+	}
+
+	
+	@Override
+	public void invoke(Integer value) throws Exception {
+		numElements++;
+		
+		if (duplicateChecker.get(value)) {
+			throw new Exception("Received a duplicate");
+		}
+		duplicateChecker.set(value);
+		if (numElements == numElementsTotal) {
+			// validate
+			if (duplicateChecker.cardinality() != numElementsTotal) {
+				throw new Exception("Duplicate checker has wrong cardinality");
+			}
+			else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+				throw new Exception("Received sparse sequence");
+			}
+			else {
+				throw new SuccessException();
+			}
+		}
+	}
+
+	@Override
+	public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
+		LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
+		return new Tuple2<Integer, BitSet>(numElements, duplicateChecker);
+	}
+
+	@Override
+	public void restoreState(Tuple2<Integer, BitSet> state) {
+		LOG.info("restoring num elements to {}", state.f0);
+		this.numElements = state.f0;
+		this.duplicateChecker = state.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
deleted file mode 100644
index 577de33..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DataGenerators.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.KafkaSink;
-import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class DataGenerators {
-	
-	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
-														String brokerConnection, String topic,
-														int numPartitions,
-														final int from, final int to) throws Exception {
-
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setNumberOfExecutionRetries(0);
-		
-		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-						int cnt = from;
-						int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-						while (running && cnt <= to) {
-							ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
-							cnt++;
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
-				new Tuple2Partitioner(numPartitions)
-		));
-
-		env.execute("Data generator (Int, Int) stream to topic " + topic);
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
-															String brokerConnection, String topic,
-															final int numPartitions,
-															final int numElements,
-															final boolean randomizeOrder) throws Exception {
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setNumberOfExecutionRetries(0);
-
-		DataStream<Integer> stream = env.addSource(
-				new RichParallelSourceFunction<Integer>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Integer> ctx) {
-						// create a sequence
-						int[] elements = new int[numElements];
-						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
-								i < numElements;
-								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-							
-							elements[i] = val;
-						}
-
-						// scramble the sequence
-						if (randomizeOrder) {
-							Random rnd = new Random();
-							for (int i = 0; i < elements.length; i++) {
-								int otherPos = rnd.nextInt(elements.length);
-								
-								int tmp = elements[i];
-								elements[i] = elements[otherPos];
-								elements[otherPos] = tmp;
-							}
-						}
-
-						// emit the sequence
-						int pos = 0;
-						while (running && pos < elements.length) {
-							ctx.collect(elements[pos++]);
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		stream
-				.rebalance()
-				.addSink(new KafkaSink<Integer>(brokerConnection, topic,
-						new TypeInformationSerializationSchema<Integer>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
-						new SerializableKafkaPartitioner() {
-							@Override
-							public int partition(Object key, int numPartitions) {
-								return ((Integer) key) % numPartitions;
-							}
-						}));
-
-		env.execute("Scrambles int sequence generator");
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static class InfiniteStringsGenerator extends Thread {
-
-		private final String kafkaConnectionString;
-		
-		private final String topic;
-		
-		private volatile Throwable error;
-		
-		private volatile boolean running = true;
-
-		
-		public InfiniteStringsGenerator(String kafkaConnectionString, String topic) {
-			this.kafkaConnectionString = kafkaConnectionString;
-			this.topic = topic;
-		}
-
-		@Override
-		public void run() {
-			// we manually feed data into the Kafka sink
-			KafkaSink<String> producer = null;
-			try {
-				producer = new KafkaSink<String>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
-				producer.open(new Configuration());
-				
-				final StringBuilder bld = new StringBuilder();
-				final Random rnd = new Random();
-				
-				while (running) {
-					bld.setLength(0);
-					
-					int len = rnd.nextInt(100) + 1;
-					for (int i = 0; i < len; i++) {
-						bld.append((char) (rnd.nextInt(20) + 'a') );
-					}
-					
-					String next = bld.toString();
-					producer.invoke(next);
-				}
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-			finally {
-				if (producer != null) {
-					try {
-						producer.close();
-					}
-					catch (Throwable t) {
-						// ignore
-					}
-				}
-			}
-		}
-		
-		public void shutdown() {
-			this.running = false;
-			this.interrupt();
-		}
-		
-		public Throwable getError() {
-			return this.error;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
deleted file mode 100644
index b89bd5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/DiscardingSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * Sink function that discards data.
- * @param <T> The type of the function.
- */
-public class DiscardingSink<T> implements SinkFunction<T> {
-
-	private static final long serialVersionUID = 2777597566520109843L;
-
-	@Override
-	public void invoke(T value) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
deleted file mode 100644
index 0209ff2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/FailingIdentityMapper.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
-		Checkpointed<Integer>, CheckpointCommitter, Runnable {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-	
-	private static final long serialVersionUID = 6334389850158707313L;
-	
-	public static volatile boolean failedBefore;
-	public static volatile boolean hasBeenCheckpointedBeforeFailure;
-
-	private final int failCount;
-	private int numElementsTotal;
-	private int numElementsThisTime;
-	
-	private boolean failer;
-	private boolean hasBeenCheckpointed;
-	
-	private Thread printer;
-	private volatile boolean printerRunning = true;
-
-	public FailingIdentityMapper(int failCount) {
-		this.failCount = failCount;
-	}
-
-	@Override
-	public void open(Configuration parameters) {
-		failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
-		printer = new Thread(this, "FailingIdentityMapper Status Printer");
-		printer.start();
-	}
-
-	@Override
-	public T map(T value) throws Exception {
-		numElementsTotal++;
-		numElementsThisTime++;
-		
-		if (!failedBefore) {
-			Thread.sleep(10);
-			
-			if (failer && numElementsTotal >= failCount) {
-				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
-				failedBefore = true;
-				throw new Exception("Artificial Test Failure");
-			}
-		}
-		return value;
-	}
-
-	@Override
-	public void close() throws Exception {
-		printerRunning = false;
-		if (printer != null) {
-			printer.interrupt();
-			printer = null;
-		}
-	}
-
-	@Override
-	public void commitCheckpoint(long checkpointId) {
-		this.hasBeenCheckpointed = true;
-	}
-
-	@Override
-	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-		return numElementsTotal;
-	}
-
-	@Override
-	public void restoreState(Integer state) {
-		numElementsTotal = state;
-	}
-
-	@Override
-	public void run() {
-		while (printerRunning) {
-			try {
-				Thread.sleep(5000);
-			}
-			catch (InterruptedException e) {
-				// ignore
-			}
-			LOG.info("============================> Failing mapper  {}: count={}, totalCount={}",
-					getRuntimeContext().getIndexOfThisSubtask(),
-					numElementsThisTime, numElementsTotal);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
deleted file mode 100644
index db5f79d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/JobManagerCommunicationUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class JobManagerCommunicationUtils {
-	
-	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
-	
-	
-	public static void cancelCurrentJob(ActorRef jobManager) throws Exception {
-		
-		// find the jobID
-
-		Future<Object> listResponse = Patterns.ask(jobManager,
-				JobManagerMessages.getRequestRunningJobsStatus(),
-				askTimeout.toMillis());
-
-		List<JobStatusMessage> jobs;
-		try {
-			Object result = Await.result(listResponse, askTimeout);
-			jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
-		}
-		catch (Exception e) {
-			throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
-		}
-		
-		if (jobs.isEmpty()) {
-			throw new Exception("Could not cancel job - no running jobs");
-		}
-		if (jobs.size() != 1) {
-			throw new Exception("Could not cancel job - more than one running job.");
-		}
-		
-		JobStatusMessage status = jobs.get(0);
-		if (status.getJobState().isTerminalState()) {
-			throw new Exception("Could not cancel job - job is not running any more");
-		}
-		
-		JobID jobId = status.getJobId();
-		
-		Future<Object> response = Patterns.ask(jobManager,new JobManagerMessages.CancelJob(jobId),
-				askTimeout.toMillis());
-		try {
-			Await.result(response, askTimeout);
-		}
-		catch (Exception e) {
-			throw new Exception("Sending the 'cancel' message failed.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
deleted file mode 100644
index 98e5817..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-
-public class MockRuntimeContext implements RuntimeContext {
-
-	private final int numberOfParallelSubtasks;
-	private final int indexOfThisSubtask;
-
-	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
-		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
-		this.indexOfThisSubtask = indexOfThisSubtask;
-	}
-
-
-	@Override
-	public String getTaskName() {
-		return null;
-	}
-
-	@Override
-	public int getNumberOfParallelSubtasks() {
-		return numberOfParallelSubtasks;
-	}
-
-	@Override
-	public int getIndexOfThisSubtask() {
-		return indexOfThisSubtask;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return null;
-	}
-
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return null;
-	}
-
-	@Override
-	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
-
-	@Override
-	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
-		return null;
-	}
-
-	@Override
-	public HashMap<String, Accumulator<?, ?>> getAllAccumulators() { return null; }
-
-	@Override
-	public IntCounter getIntCounter(String name) {
-		return null;
-	}
-
-	@Override
-	public LongCounter getLongCounter(String name) {
-		return null;
-	}
-
-	@Override
-	public DoubleCounter getDoubleCounter(String name) {
-		return null;
-	}
-
-	@Override
-	public Histogram getHistogram(String name) {
-		return null;
-	}
-
-	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
-		return null;
-	}
-
-	@Override
-	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		return null;
-	}
-
-	@Override
-	public DistributedCache getDistributedCache() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java
deleted file mode 100644
index aa06b49..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/PartitionValidatingMapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
-
-	private static final long serialVersionUID = 1088381231244959088L;
-	
-	/* the partitions from which this function received data */
-	private final Set<Integer> myPartitions = new HashSet<Integer>();
-	
-	private final int numPartitions;
-	private final int maxPartitions;
-
-	public PartitionValidatingMapper(int numPartitions, int maxPartitions) {
-		this.numPartitions = numPartitions;
-		this.maxPartitions = maxPartitions;
-	}
-
-	@Override
-	public Integer map(Integer value) throws Exception {
-		// validate that the partitioning is identical
-		int partition = value % numPartitions;
-		myPartitions.add(partition);
-		if (myPartitions.size() > maxPartitions) {
-			throw new Exception("Error: Elements from too many different partitions: " + myPartitions
-					+ ". Expect elements only from " + maxPartitions + " partitions");
-		}
-		return value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java
deleted file mode 100644
index 60e2e51..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/SuccessException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-/**
- * Exception that is thrown to terminate a program and indicate success.
- */
-public class SuccessException extends Exception {
-	private static final long serialVersionUID = -7011865671593955887L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java
deleted file mode 100644
index 872d42f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ThrottledMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-/**
- * An identity map function that sleeps between elements, throttling the
- * processing speed.
- * 
- * @param <T> The type mapped.
- */
-public class ThrottledMapper<T> implements MapFunction<T,T> {
-
-	private static final long serialVersionUID = 467008933767159126L;
-
-	private final int sleep;
-
-	public ThrottledMapper(int sleep) {
-		this.sleep = sleep;
-	}
-
-	@Override
-	public T map(T value) throws Exception {
-		Thread.sleep(this.sleep);
-		return value;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java
deleted file mode 100644
index 1e5f027..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.SerializableKafkaPartitioner;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
- */
-public class Tuple2Partitioner implements SerializableKafkaPartitioner {
-	
-	private static final long serialVersionUID = 1L;
-
-	private final int expectedPartitions;
-
-	
-	public Tuple2Partitioner(int expectedPartitions) {
-		this.expectedPartitions = expectedPartitions;
-	}
-
-	@Override
-	public int partition(Object key, int numPartitions) {
-		if (numPartitions != expectedPartitions) {
-			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
-		}
-		@SuppressWarnings("unchecked")
-		Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
-		
-		return element.f0;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2f6b7b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
deleted file mode 100644
index fdd22e3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.testutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.BitSet;
-
-public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer, BitSet>> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
-
-	private static final long serialVersionUID = 1748426382527469932L;
-	
-	private final int numElementsTotal;
-	
-	private BitSet duplicateChecker = new BitSet();  // this is checkpointed
-
-	private int numElements; // this is checkpointed
-
-	
-	public ValidatingExactlyOnceSink(int numElementsTotal) {
-		this.numElementsTotal = numElementsTotal;
-	}
-
-	
-	@Override
-	public void invoke(Integer value) throws Exception {
-		numElements++;
-		
-		if (duplicateChecker.get(value)) {
-			throw new Exception("Received a duplicate");
-		}
-		duplicateChecker.set(value);
-		if (numElements == numElementsTotal) {
-			// validate
-			if (duplicateChecker.cardinality() != numElementsTotal) {
-				throw new Exception("Duplicate checker has wrong cardinality");
-			}
-			else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
-				throw new Exception("Received sparse sequence");
-			}
-			else {
-				throw new SuccessException();
-			}
-		}
-	}
-
-	@Override
-	public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
-		LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
-		return new Tuple2<Integer, BitSet>(numElements, duplicateChecker);
-	}
-
-	@Override
-	public void restoreState(Tuple2<Integer, BitSet> state) {
-		LOG.info("restoring num elements to {}", state.f0);
-		this.numElements = state.f0;
-		this.duplicateChecker = state.f1;
-	}
-}