You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:27 UTC

[11/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
new file mode 100644
index 0000000..65904d2
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
+ *
+ * <p>
+ * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ * rolling behaviour.
+ */
+public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
+
+	final long NUM_STRINGS = 16_000;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+
+	private static String outPath;
+
+
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+	}
+
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+		int PARALLELISM = 6;
+
+		env.enableCheckpointing(200);
+		env.setParallelism(PARALLELISM);
+		env.disableOperatorChaining();
+
+		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+		DataStream<String> mapped = stream
+				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new NonRollingBucketer())
+				.setBatchSize(10000)
+				.setValidLengthPrefix("")
+				.setPendingPrefix("");
+
+		mapped.addSink(sink);
+
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		// We read the files and verify that we have read all the strings. If a valid-length
+		// file exists we only read the file to that point. (This test should work with
+		// FileSystems that support truncate() and with others as well.)
+
+		Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+		// Keep a set of the message IDs that we read. The size must equal the read count and
+		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+		// elements twice.
+		Set<Integer> readNumbers = Sets.newHashSet();
+		int numRead = 0;
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+				outPath), true);
+
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+
+			if (!file.getPath().toString().endsWith(".valid-length")) {
+				int validLength = (int) file.getLen();
+				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+					String validLengthString = inStream.readUTF();
+					validLength = Integer.parseInt(validLengthString);
+					System.out.println("VALID LENGTH: " + validLength);
+				}
+				FSDataInputStream inStream = dfs.open(file.getPath());
+				byte[] buffer = new byte[validLength];
+				inStream.readFully(0, buffer, 0, validLength);
+				inStream.close();
+
+				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+				InputStreamReader inStreamReader = new InputStreamReader(bais);
+				BufferedReader br = new BufferedReader(inStreamReader);
+
+				String line = br.readLine();
+				while (line != null) {
+					Matcher matcher = messageRegex.matcher(line);
+					if (matcher.matches()) {
+						numRead++;
+						int messageId = Integer.parseInt(matcher.group(1));
+						readNumbers.add(messageId);
+					} else {
+						Assert.fail("Read line does not match expected pattern.");
+					}
+					line = br.readLine();
+				}
+				br.close();
+				inStreamReader.close();
+				bais.close();
+			}
+		}
+
+		// Verify that we read all strings (at-least-once)
+		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+		// Verify that we don't have duplicates (boom!, exactly-once)
+		Assert.assertEquals(NUM_STRINGS, numRead);
+	}
+
+	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+
+		OnceFailingIdentityMapper(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+			implements CheckpointedAsynchronously<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final long numElements;
+
+		private int index;
+
+		private volatile boolean isRunning = true;
+
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
+
+			while (isRunning && index < numElements) {
+
+				Thread.sleep(1);
+				synchronized (lockingObject) {
+					ctx.collect("message " + index);
+					index += step;
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+
+		private static String randomString(StringBuilder bld, Random rnd) {
+			final int len = rnd.nextInt(10) + 5;
+
+			for (int i = 0; i < len; i++) {
+				char next = (char) (rnd.nextInt(20000) + 33);
+				bld.append(next);
+			}
+
+			return bld.toString();
+		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
new file mode 100644
index 0000000..9770f41
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -0,0 +1,506 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.taskmanager.MultiShotLatch;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Tests for {@link RollingSink}. These
+ * tests test the different output methods as well as the rolling feature using a manual clock
+ * that increases time in lockstep with element computation using latches.
+ *
+ * <p>
+ * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
+ * exactly once behaviour.
+ */
+public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static MiniDFSCluster hdfsCluster;
+	private static org.apache.hadoop.fs.FileSystem dfs;
+	private static String hdfsURI;
+
+
+	@BeforeClass
+	public static void createHDFS() throws IOException {
+		Configuration conf = new Configuration();
+
+		File dataDir = tempFolder.newFolder();
+
+		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+		hdfsCluster = builder.build();
+
+		dfs = hdfsCluster.getFileSystem();
+
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		hdfsCluster.shutdown();
+	}
+
+	/**
+	 * This tests {@link StringWriter} with
+	 * non-rolling output.
+	 */
+	@Test
+	public void testNonRollingStringWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/string-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		source
+				.map(new MapFunction<Tuple2<Integer,String>, String>() {
+					private static final long serialVersionUID = 1L;
+					@Override
+					public String map(Tuple2<Integer, String> value) throws Exception {
+						return value.f1;
+					}
+				})
+				.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		br = new BufferedReader(new InputStreamReader(inStream));
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			String line = br.readLine();
+			Assert.assertEquals("message #" + i, line);
+		}
+
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output and without compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+			}
+		});
+
+
+		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+				.setWriter(new SequenceFileWriter<IntWritable, Text>())
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+
+	/**
+	 * This tests {@link SequenceFileWriter}
+	 * with non-rolling output but with compression.
+	 */
+	@Test
+	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/seq-non-rolling-out";
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+				.broadcast()
+				.filter(new OddEvenFilter());
+
+		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+			}
+		});
+
+
+		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+				.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
+				.setBucketer(new NonRollingBucketer())
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		IntWritable intWritable = new IntWritable();
+		Text txt = new Text();
+
+		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+
+		inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+		reader = new SequenceFile.Reader(inStream,
+				1000,
+				0,
+				100000,
+				new Configuration());
+
+		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+			reader.next(intWritable, txt);
+			Assert.assertEquals(i, intWritable.get());
+			Assert.assertEquals("message #" + i, txt.toString());
+		}
+
+		reader.close();
+		inStream.close();
+	}
+
+	// we use this to synchronize the clock changes to elements being processed
+	final static MultiShotLatch latch1 = new MultiShotLatch();
+	final static MultiShotLatch latch2 = new MultiShotLatch();
+
+	/**
+	 * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
+	 * produce rolling files. The clock of DateTimeBucketer is set to
+	 * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
+	 * latches.
+	 */
+	@Test
+	public void testDateTimeRollingStringWriter() throws Exception {
+		final int NUM_ELEMENTS = 20;
+		final int PARALLELISM = 2;
+		final String outPath = hdfsURI + "/rolling-out";
+		DateTimeBucketer.setClock(new ModifyableClock());
+		ModifyableClock.setCurrentTime(0);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+
+
+		DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
+				NUM_ELEMENTS))
+				.broadcast();
+
+		// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
+		// fire the latch
+		DataStream<String> mapped = source
+				.flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
+					private static final long serialVersionUID = 1L;
+
+					int count = 0;
+					@Override
+					public void flatMap(Tuple2<Integer, String> value,
+							Collector<String> out) throws Exception {
+						out.collect(value.f1);
+						count++;
+						if (count >= 5) {
+							if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+								latch1.trigger();
+							} else {
+								latch2.trigger();
+							}
+							count = 0;
+						}
+					}
+
+				});
+
+		RollingSink<String> sink = new RollingSink<String>(outPath)
+				.setBucketer(new DateTimeBucketer("ss"))
+				.setPartPrefix("part")
+				.setPendingPrefix("")
+				.setPendingSuffix("");
+
+		mapped.addSink(sink);
+
+		env.execute("RollingSink String Write Test");
+
+		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
+
+		// we should have 8 rolling files, 4 time intervals and parallelism of 2
+		int numFiles = 0;
+		while (files.hasNext()) {
+			LocatedFileStatus file = files.next();
+			numFiles++;
+			if (file.getPath().toString().contains("rolling-out/00")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 0; i < 5; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/05")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 5; i < 10; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/10")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 10; i < 15; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else if (file.getPath().toString().contains("rolling-out/15")) {
+				FSDataInputStream inStream = dfs.open(file.getPath());
+
+				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+				for (int i = 15; i < 20; i++) {
+					String line = br.readLine();
+					Assert.assertEquals("message #" + i, line);
+				}
+
+				inStream.close();
+			} else {
+				Assert.fail("File " + file + " does not match any expected roll pattern.");
+			}
+		}
+
+		Assert.assertEquals(8, numFiles);
+	}
+
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		private final int numElements;
+
+		public TestSourceFunction(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < numElements && running; i++) {
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	/**
+	 * This waits on the two multi-shot latches. The latches are triggered in a parallel
+	 * flatMap inside the test topology.
+	 */
+	private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		private final int numElements;
+
+		public WaitingTestSourceFunction(int numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+			for (int i = 0; i < numElements && running; i++) {
+				if (i % 5 == 0 && i > 0) {
+					// update the clock after "five seconds", so we get 20 seconds in total
+					// with 5 elements in each time window
+					latch1.await();
+					latch2.await();
+					ModifyableClock.setCurrentTime(i * 1000);
+				}
+				ctx.collect(Tuple2.of(i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Tuple2<Integer, String> value) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				return value.f0 % 2 == 0;
+			} else {
+				return value.f0 % 2 == 1;
+			}
+		}
+	}
+
+	public static class ModifyableClock implements Clock {
+
+		private static volatile long currentTime = 0;
+
+		public static void setCurrentTime(long currentTime) {
+			ModifyableClock.currentTime = currentTime;
+		}
+
+		@Override
+		public long currentTimeMillis() {
+			return currentTime;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fe60d94
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-streaming-connectors/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..912a7e4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-flume</artifactId>
+	<name>flink-connector-flume</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<flume-ng.version>1.5.0</flume-ng.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flume</groupId>
+			<artifactId>flume-ng-core</artifactId>
+			<version>${flume-ng.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-io</groupId>
+					<artifactId>commons-io</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-cli</groupId>
+					<artifactId>commons-cli</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-lang</groupId>
+					<artifactId>commons-lang</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-core-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-mapper-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.thoughtworks.paranamer</groupId>
+					<artifactId>paranamer</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.tukaani</groupId>
+					<artifactId>xz</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.velocity</groupId>
+					<artifactId>velocity</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-collections</groupId>
+					<artifactId>commons-collections</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.code.gson</groupId>
+					<artifactId>gson</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.thrift</groupId>
+					<artifactId>libthrift</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<!-- We include all dependencies that transitively depend on guava -->
+									<include>org.apache.flume:*</include>
+								</includes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..50f5770
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+	private transient FlinkRpcClientFacade client;
+	boolean initDone = false;
+	String host;
+	int port;
+	SerializationSchema<IN, byte[]> schema;
+
+	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
+		this.host = host;
+		this.port = port;
+		this.schema = schema;
+	}
+
+	/**
+	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
+	 * them to Apache Flume.
+	 * 
+	 * @param value
+	 *            The tuple arriving from the datastream
+	 */
+	@Override
+	public void invoke(IN value) {
+
+		byte[] data = schema.serialize(value);
+		client.sendDataToFlume(data);
+
+	}
+
+	private class FlinkRpcClientFacade {
+		private RpcClient client;
+		private String hostname;
+		private int port;
+
+		/**
+		 * Initializes the connection to Apache Flume.
+		 * 
+		 * @param hostname
+		 *            The host
+		 * @param port
+		 *            The port.
+		 */
+		public void init(String hostname, int port) {
+			// Setup the RPC connection
+			this.hostname = hostname;
+			this.port = port;
+			int initCounter = 0;
+			while (true) {
+				if (initCounter >= 90) {
+					throw new RuntimeException("Cannot establish connection with" + port + " at "
+							+ host);
+				}
+				try {
+					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+				} catch (FlumeException e) {
+					// Wait one second if the connection failed before the next
+					// try
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e1) {
+						if (LOG.isErrorEnabled()) {
+							LOG.error("Interrupted while trying to connect {} at {}", port, host);
+						}
+					}
+				}
+				if (client != null) {
+					break;
+				}
+				initCounter++;
+			}
+			initDone = true;
+		}
+
+		/**
+		 * Sends byte arrays as {@link Event} series to Apache Flume.
+		 * 
+		 * @param data
+		 *            The byte array to send to Apache FLume
+		 */
+		public void sendDataToFlume(byte[] data) {
+			Event event = EventBuilder.withBody(data);
+
+			try {
+				client.append(event);
+
+			} catch (EventDeliveryException e) {
+				// clean up and recreate the client
+				client.close();
+				client = null;
+				client = RpcClientFactory.getDefaultInstance(hostname, port);
+			}
+		}
+
+	}
+
+	@Override
+	public void close() {
+		client.client.close();
+	}
+
+	@Override
+	public void open(Configuration config) {
+		client = new FlinkRpcClientFacade();
+		client.init(host, port);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..8fecd0f
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,149 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import java.util.List;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+//import org.apache.flink.util.Collector;
+//import org.apache.flume.Context;
+//import org.apache.flume.channel.ChannelProcessor;
+//import org.apache.flume.source.AvroSource;
+//import org.apache.flume.source.avro.AvroFlumeEvent;
+//import org.apache.flume.source.avro.Status;
+//
+//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
+//	private static final long serialVersionUID = 1L;
+//
+//	String host;
+//	String port;
+//	volatile boolean finished = false;
+//
+//	private volatile boolean isRunning = false;
+//
+//	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+//		super(deserializationSchema);
+//		this.host = host;
+//		this.port = Integer.toString(port);
+//	}
+//
+//	public class MyAvroSource extends AvroSource {
+//		Collector<OUT> output;
+//
+//		/**
+//		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+//		 * {@link DataStream}.
+//		 *
+//		 * @param avroEvent
+//		 *            The event that should be sent to the dataStream
+//		 * @return A {@link Status}.OK message if sending the event was
+//		 *         successful.
+//		 */
+//		@Override
+//		public Status append(AvroFlumeEvent avroEvent) {
+//			collect(avroEvent);
+//			return Status.OK;
+//		}
+//
+//		/**
+//		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+//		 * {@link DataStream}.
+//		 *
+//		 * @param events
+//		 *            The events that is sent to the dataStream
+//		 * @return A Status.OK message if sending the events was successful.
+//		 */
+//		@Override
+//		public Status appendBatch(List<AvroFlumeEvent> events) {
+//			for (AvroFlumeEvent avroEvent : events) {
+//				collect(avroEvent);
+//			}
+//
+//			return Status.OK;
+//		}
+//
+//		/**
+//		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+//		 * {@link DataStream}.
+//		 *
+//		 * @param avroEvent
+//		 *            The event that is sent to the dataStream
+//		 */
+//		private void collect(AvroFlumeEvent avroEvent) {
+//			byte[] b = avroEvent.getBody().array();
+//			OUT out = FlumeSource.this.schema.deserialize(b);
+//
+//			if (schema.isEndOfStream(out)) {
+//				FlumeSource.this.finished = true;
+//				this.stop();
+//				FlumeSource.this.notifyAll();
+//			} else {
+//				output.collect(out);
+//			}
+//
+//		}
+//
+//	}
+//
+//	MyAvroSource avroSource;
+//
+//	/**
+//	 * Configures the AvroSource. Also sets the output so the application can
+//	 * use it from outside of the invoke function.
+//	 *
+//	 * @param output
+//	 *            The output used in the invoke function
+//	 */
+//	public void configureAvroSource(Collector<OUT> output) {
+//
+//		avroSource = new MyAvroSource();
+//		avroSource.output = output;
+//		Context context = new Context();
+//		context.put("port", port);
+//		context.put("bind", host);
+//		avroSource.configure(context);
+//		// An instance of a ChannelProcessor is required for configuring the
+//		// avroSource although it will not be used in this case.
+//		ChannelProcessor cp = new ChannelProcessor(null);
+//		avroSource.setChannelProcessor(cp);
+//	}
+//
+//	/**
+//	 * Configures the AvroSource and runs until the user calls a close function.
+//	 *
+//	 * @param output
+//	 *            The Collector for sending data to the datastream
+//	 */
+//	@Override
+//	public void run(Collector<OUT> output) throws Exception {
+//		isRunning = true;
+//		configureAvroSource(output);
+//		avroSource.start();
+//		while (!finished && isRunning) {
+//			this.wait();
+//		}
+//	}
+//
+//	@Override
+//	public void cancel() {
+//		isRunning = false;
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..45da6eb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,49 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.util.serialization.SerializationSchema;
+//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+//
+//public class FlumeTopology {
+//
+//	public static void main(String[] args) throws Exception {
+//
+//		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+//
+//		@SuppressWarnings("unused")
+//		DataStream<String> inputStream1 = env.addSource(
+//				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
+//				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
+//
+//		env.execute();
+//	}
+//
+//	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+//
+//		private static final long serialVersionUID = 1L;
+//
+//		@Override
+//		public byte[] serialize(String element) {
+//			return element.getBytes();
+//		}
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-streaming-connectors/flink-connector-kafka/pom.xml
new file mode 100644
index 0000000..57a9a56
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka</artifactId>
+	<name>flink-connector-kafka</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.8.2.0</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.sf.jopt-simple</groupId>
+					<artifactId>jopt-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-reflect</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yammer.metrics</groupId>
+					<artifactId>metrics-annotation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+					<forkCount>1</forkCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+	
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
new file mode 100644
index 0000000..8066b3c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions. 
+ * 
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". 
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ * 
+ * <p>To support a variety of Kafka brokers, protocol versions, and offset committing approaches,
+ * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an <i>offset handler</i>.</p>
+ *
+ * <h1>Fetcher</h1>
+ * 
+ * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has undergone a change in
+ * protocols and APIs, there are currently two fetchers available:</p>
+ * 
+ * <ul>
+ *     <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new Kafka consumer API.
+ *         This fetcher is generally more robust, but works only with later versions of
+ *         Kafka (> 0.8.2).</li>
+ *         
+ *     <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old low-level consumer API.
+ *         This fetcher is works also with older versions of Kafka (0.8.1). The fetcher interprets
+ *         the old Kafka consumer properties, like:
+ *         <ul>
+ *             <li>socket.timeout.ms</li>
+ *             <li>socket.receive.buffer.bytes</li>
+ *             <li>fetch.message.max.bytes</li>
+ *             <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
+ *             <li>fetch.wait.max.ms</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ * 
+ * <h1>Offset handler</h1>
+ * 
+ * <p>Offsets whose records have been read and are checkpointed will be committed back to Kafka / ZooKeeper
+ * by the offset handler. In addition, the offset handler finds the point where the source initially
+ * starts reading from the stream, when the streaming job is started.</p>
+ * 
+ * <p>Currently, the source offers two different offset handlers exist:</p>
+ * <ul>
+ *     <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka brokers are managing the offsets,
+ *         and hence offsets need to be committed the Kafka brokers, rather than to ZooKeeper.
+ *         Note that this offset handler works only on new versions of Kafka (0.8.2.x +) and
+ *         with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li>
+ *         
+ *     <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when the offsets are managed
+ *         by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li>
+ * </ul>
+ * 
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ * 
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
+		implements CheckpointNotifier, CheckpointedAsynchronously<long[]>, ResultTypeQueryable<T> {
+
+	/**
+	 * The offset store defines how acknowledged offsets are committed back to Kafka. Different
+	 * options include letting Flink periodically commit to ZooKeeper, or letting Kafka manage the
+	 * offsets (new Kafka versions only).
+	 */
+	public enum OffsetStore {
+
+		/**
+		 * Let Flink manage the offsets. Flink will periodically commit them to Zookeeper (usually after
+		 * successful checkpoints), in the same structure as Kafka 0.8.2.x
+		 * 
+		 * <p>Use this mode when using the source with Kafka 0.8.1.x brokers.</p>
+		 */
+		FLINK_ZOOKEEPER,
+
+		/**
+		 * Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different
+		 * mechanism will be used (broker coordinator, zookeeper)
+		 */ 
+		KAFKA
+	}
+
+	/**
+	 * The fetcher type defines which code paths to use to pull data from teh Kafka broker.
+	 */
+	public enum FetcherType {
+
+		/**
+		 * The legacy fetcher uses Kafka's old low-level consumer API.
+		 * 
+		 * <p>Use this fetcher for Kafka 0.8.1 brokers.</p>
+		 */
+		LEGACY_LOW_LEVEL,
+
+		/**
+		 * This fetcher uses a backport of the new consumer API to pull data from the Kafka broker.
+		 * It is the fetcher that will be maintained in the future, and it already 
+		 * handles certain failure cases with less overhead than the legacy fetcher.
+		 * 
+		 * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future versions).</p>
+		 */
+		NEW_HIGH_LEVEL
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	private static final long serialVersionUID = -6272159445203409112L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+
+	/** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+	 * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+	public static final long OFFSET_NOT_SET = -915623761776L;
+
+	/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+	/** Configuration key for the number of retries for getting the partition info */
+	public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
+
+	/** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
+	public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
+
+	
+	
+	// ------  Configuration of the Consumer -------
+	
+	/** The offset store where this consumer commits safe offsets */
+	private final OffsetStore offsetStore;
+
+	/** The type of fetcher to be used to pull data from Kafka */
+	private final FetcherType fetcherType;
+	
+	/** name of the topic consumed by this source */
+	private final String topic;
+	
+	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
+	private final Properties props;
+	
+	/** The ids of the partitions that are read by this consumer */
+	private final int[] partitions;
+	
+	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
+	private final DeserializationSchema<T> valueDeserializer;
+
+	// ------  Runtime State  -------
+
+	/** Data for pending but uncommitted checkpoints */
+	private final LinkedMap pendingCheckpoints = new LinkedMap();
+	
+	/** The fetcher used to pull data from the Kafka brokers */
+	private transient Fetcher fetcher;
+	
+	/** The committer that persists the committed offsets */
+	private transient OffsetHandler offsetHandler;
+	
+	/** The partitions actually handled by this consumer */
+	private transient List<TopicPartition> subscribedPartitions;
+
+	/** The offsets of the last returned elements */
+	private transient long[] lastOffsets;
+
+	/** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
+	 * newer then the last offsets (Flink's internal view is fresher) */
+	private transient long[] commitedOffsets;
+	
+	/** The offsets to restore to, if the consumer restores state from a checkpoint */
+	private transient long[] restoreToOffset;
+	
+	private volatile boolean running = true;
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
+	 * 
+	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
+	 * at the beginnign of this class.</p>
+	 * 
+	 * @param topic 
+	 *           The Kafka topic to read from.
+	 * @param valueDeserializer
+	 *           The deserializer to turn raw byte messages into Java/Scala objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 * @param offsetStore
+	 *           The type of offset store to use (Kafka / ZooKeeper)
+	 * @param fetcherType
+	 *           The type of fetcher to use (new high-level API, old low-level API).
+	 */
+	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props, 
+								OffsetStore offsetStore, FetcherType fetcherType) {
+		this.offsetStore = checkNotNull(offsetStore);
+		this.fetcherType = checkNotNull(fetcherType);
+
+		if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
+			throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
+					"supported in Flink");
+		}
+		if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
+			throw new IllegalArgumentException(
+					"The Kafka offset handler cannot be used together with the old low-level fetcher.");
+		}
+		
+		this.topic = checkNotNull(topic, "topic");
+		this.props = checkNotNull(props, "props");
+		this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
+
+		// validate the zookeeper properties
+		if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
+			validateZooKeeperConfig(props);
+		}
+		
+		// Connect to a broker to get the partitions
+		List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
+
+		// get initial partitions list. The order of the partitions is important for consistent 
+		// partition id assignment in restart cases.
+		this.partitions = new int[partitionInfos.size()];
+		for (int i = 0; i < partitionInfos.size(); i++) {
+			partitions[i] = partitionInfos.get(i).partition();
+			
+			if (partitions[i] >= partitions.length) {
+				throw new RuntimeException("Kafka partition numbers are sparse");
+			}
+		}
+		LOG.info("Topic {} has {} partitions", topic, partitions.length);
+
+		// make sure that we take care of the committing
+		props.setProperty("enable.auto.commit", "false");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		
+		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
+		final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
+		
+		// pick which partitions we work on
+		subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
+		
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Kafka consumer {} will read partitions {} out of partitions {}",
+					thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions));
+		}
+
+		// we leave the fetcher as null, if we have no partitions
+		if (subscribedPartitions.isEmpty()) {
+			LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex);
+			return;
+		}
+		
+		// create fetcher
+		switch (fetcherType){
+			case NEW_HIGH_LEVEL:
+				throw new UnsupportedOperationException("Currently unsupported");
+			case LEGACY_LOW_LEVEL:
+				fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName());
+				break;
+			default:
+				throw new RuntimeException("Requested unknown fetcher " + fetcher);
+		}
+		fetcher.setPartitionsToRead(subscribedPartitions);
+
+		// offset handling
+		switch (offsetStore){
+			case FLINK_ZOOKEEPER:
+				offsetHandler = new ZookeeperOffsetHandler(props);
+				break;
+			case KAFKA:
+				throw new Exception("Kafka offset handler cannot work with legacy fetcher");
+			default:
+				throw new RuntimeException("Requested unknown offset store " + offsetStore);
+		}
+		
+		// set up operator state
+		lastOffsets = new long[partitions.length];
+		commitedOffsets = new long[partitions.length];
+		
+		Arrays.fill(lastOffsets, OFFSET_NOT_SET);
+		Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
+		
+		// seek to last known pos, from restore request
+		if (restoreToOffset != null) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Consumer {} found offsets from previous checkpoint: {}",
+						thisComsumerIndex,  Arrays.toString(restoreToOffset));
+			}
+			
+			for (int i = 0; i < restoreToOffset.length; i++) {
+				long restoredOffset = restoreToOffset[i];
+				if (restoredOffset != OFFSET_NOT_SET) {
+					// if this fails because we are not subscribed to the topic, then the
+					// partition assignment is not deterministic!
+					
+					// we set the offset +1 here, because seek() is accepting the next offset to read,
+					// but the restore offset is the last read offset
+					fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
+					lastOffsets[i] = restoredOffset;
+				}
+			}
+		}
+		else {
+			// no restore request. Let the offset handler take care of the initial offset seeking
+			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
+		}
+	}
+
+	@Override
+	public void run(SourceContext<T> sourceContext) throws Exception {
+		if (fetcher != null) {
+			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+		}
+		else {
+			// this source never completes
+			final Object waitLock = new Object();
+			while (running) {
+				// wait until we are canceled
+				try {
+					//noinspection SynchronizationOnLocalVariableOrMethodParameter
+					synchronized (waitLock) {
+						waitLock.wait();
+					}
+				}
+				catch (InterruptedException e) {
+					// do nothing, check our "running" status
+				}
+			}
+		}
+		
+		// close the context after the work was done. this can actually only
+		// happen when the fetcher decides to stop fetching
+		sourceContext.close();
+	}
+
+	@Override
+	public void cancel() {
+		// set ourselves as not running
+		running = false;
+		
+		// close the fetcher to interrupt any work
+		Fetcher fetcher = this.fetcher;
+		this.fetcher = null;
+		if (fetcher != null) {
+			try {
+				fetcher.close();
+			}
+			catch (IOException e) {
+				LOG.warn("Error while closing Kafka connector data fetcher", e);
+			}
+		}
+		
+		OffsetHandler offsetHandler = this.offsetHandler;
+		this.offsetHandler = null;
+		if (offsetHandler != null) {
+			try {
+				offsetHandler.close();
+			}
+			catch (IOException e) {
+				LOG.warn("Error while closing Kafka connector offset handler", e);
+			}
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		cancel();
+		super.close();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return valueDeserializer.getProducedType();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpoint and restore
+	// ------------------------------------------------------------------------
+
+	@Override
+	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if (lastOffsets == null) {
+			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
+			return null;
+		}
+		if (!running) {
+			LOG.debug("snapshotState() called on closed source");
+			return null;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
+					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+		}
+
+		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+
+		// the map cannot be asynchronously updated, because only one checkpoint call can happen
+		// on this function at a time: either snapshotState() or notifyCheckpointComplete()
+		pendingCheckpoints.put(checkpointId, currentOffsets);
+			
+		while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+			pendingCheckpoints.remove(0);
+		}
+
+		return currentOffsets;
+	}
+
+	@Override
+	public void restoreState(long[] restoredOffsets) {
+		restoreToOffset = restoredOffsets;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		if (fetcher == null) {
+			LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+			return;
+		}
+		if (!running) {
+			LOG.debug("notifyCheckpointComplete() called on closed source");
+			return;
+		}
+		
+		// only one commit operation must be in progress
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
+		}
+
+		try {
+			long[] checkpointOffsets;
+	
+			// the map may be asynchronously updates when snapshotting state, so we synchronize
+			synchronized (pendingCheckpoints) {
+				final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+				if (posInMap == -1) {
+					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+					return;
+				}
+	
+				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+				
+				// remove older checkpoints in map
+				for (int i = 0; i < posInMap; i++) {
+					pendingCheckpoints.remove(0);
+				}
+			}
+	
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
+			}
+	
+			// build the map of (topic,partition) -> committed offset
+			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
+			for (TopicPartition tp : subscribedPartitions) {
+				
+				int partition = tp.partition();
+				long offset = checkpointOffsets[partition];
+				long lastCommitted = commitedOffsets[partition];
+				
+				if (offset != OFFSET_NOT_SET) {
+					if (offset > lastCommitted) {
+						offsetsToCommit.put(tp, offset);
+						LOG.debug("Committing offset {} for partition {}", offset, partition);
+					}
+					else {
+						LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+					}
+				}
+			}
+			
+			offsetHandler.commit(offsetsToCommit);
+		}
+		catch (Exception e) {
+			if (running) {
+				throw e;
+			}
+			// else ignore exception if we are no longer running
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Miscellaneous utilities 
+	// ------------------------------------------------------------------------
+
+	protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName,
+															int numConsumers, int consumerIndex) {
+		checkArgument(numConsumers > 0);
+		checkArgument(consumerIndex < numConsumers);
+		
+		List<TopicPartition> partitionsToSub = new ArrayList<>();
+
+		for (int i = 0; i < partitions.length; i++) {
+			if (i % numConsumers == consumerIndex) {
+				partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
+			}
+		}
+		return partitionsToSub;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Kafka / ZooKeeper communication utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Send request to Kafka to get partitions for topic.
+	 * 
+	 * @param topic The name of the topic.
+	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
+	 */
+	public static List<PartitionInfo> getPartitionsForTopic(final String topic, final Properties properties) {
+		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+		final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
+
+		checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
+		String[] seedBrokers = seedBrokersConfString.split(",");
+		List<PartitionInfo> partitions = new ArrayList<>();
+
+		Random rnd = new Random();
+		retryLoop: for(int retry = 0; retry < numRetries; retry++) {
+			// we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
+			// parallel source instances start. Still, we try all available brokers.
+			int index = rnd.nextInt(seedBrokers.length);
+			brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
+				String seedBroker = seedBrokers[index];
+				LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
+				if (++index == seedBrokers.length) {
+					index = 0;
+				}
+
+				URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
+				SimpleConsumer consumer = null;
+				try {
+					final String clientId = "flink-kafka-consumer-partition-lookup";
+					final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
+					final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
+					consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
+
+					List<String> topics = Collections.singletonList(topic);
+					TopicMetadataRequest req = new TopicMetadataRequest(topics);
+					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+					List<TopicMetadata> metaData = resp.topicsMetadata();
+
+					// clear in case we have an incomplete list from previous tries
+					partitions.clear();
+					for (TopicMetadata item : metaData) {
+						if (item.errorCode() != ErrorMapping.NoError()) {
+							if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
+								// fail hard if topic is unknown
+								throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode()));
+							}
+							// warn and try more brokers
+							LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic,
+									ErrorMapping.exceptionFor(item.errorCode()));
+							continue brokersLoop;
+						}
+						if (!item.topic().equals(topic)) {
+							LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
+							continue brokersLoop;
+						}
+						for (PartitionMetadata part : item.partitionsMetadata()) {
+							Node leader = brokerToNode(part.leader());
+							Node[] replicas = new Node[part.replicas().size()];
+							for (int i = 0; i < part.replicas().size(); i++) {
+								replicas[i] = brokerToNode(part.replicas().get(i));
+							}
+
+							Node[] ISRs = new Node[part.isr().size()];
+							for (int i = 0; i < part.isr().size(); i++) {
+								ISRs[i] = brokerToNode(part.isr().get(i));
+							}
+							PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
+							partitions.add(pInfo);
+						}
+					}
+					break retryLoop; // leave the loop through the brokers
+				} catch (Exception e) {
+					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e);
+				} finally {
+					if (consumer != null) {
+						consumer.close();
+					}
+				}
+			} // brokers loop
+		} // retries loop
+		return partitions;
+	}
+
+	private static Node brokerToNode(Broker broker) {
+		return new Node(broker.id(), broker.host(), broker.port());
+	}
+	
+	protected static void validateZooKeeperConfig(Properties props) {
+		if (props.getProperty("zookeeper.connect") == null) {
+			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
+		}
+		if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+			throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
+					+ "' has not been set in the properties");
+		}
+		
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
+		}
+		
+		try {
+			//noinspection ResultOfMethodCallIgnored
+			Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
+		}
+		catch (NumberFormatException e) {
+			throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
new file mode 100644
index 0000000..21f24e6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
+ * The consumer will internally use the old low-level Kafka API, and manually commit offsets
+ * partition offsets to ZooKeeper.
+ * 
+ * <p>The following additional configuration values are available:</p>
+ * <ul>
+ *   <li>socket.timeout.ms</li>
+ *   <li>socket.receive.buffer.bytes</li>
+ *   <li>fetch.message.max.bytes</li>
+ *   <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
+ *   <li>fetch.wait.max.ms</li>
+ * </ul>
+ * 
+ * @param <T> The type of elements produced by this consumer.
+ */
+public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
+
+	private static final long serialVersionUID = -5649906773771949146L;
+
+	/**
+	 * Creates a new Kafka 0.8.1.x streaming source consumer.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
new file mode 100644
index 0000000..77e41e5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
+ * The consumer will internally use the old low-level Kafka API, and manually commit offsets
+ * partition offsets to ZooKeeper.
+ *
+ * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 0.8.3 consumer API
+ * also against Kafka 0.8.2 installations.
+ *
+ * @param <T> The type of elements produced by this consumer.
+ */
+public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
+
+	private static final long serialVersionUID = -8450689820627198228L;
+
+	/**
+	 * Creates a new Kafka 0.8.2.x streaming source consumer.
+	 * 
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
+}