You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:27 UTC
[11/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
new file mode 100644
index 0000000..65904d2
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
+ *
+ * <p>
+ * This test only verifies the exactly once behaviour of the sink. Another test tests the
+ * rolling behaviour.
+ */
+public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
+
+ final long NUM_STRINGS = 16_000;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+ private static org.apache.hadoop.fs.FileSystem dfs;
+
+ private static String outPath;
+
+
+
+ @BeforeClass
+ public static void createHDFS() throws IOException {
+ Configuration conf = new Configuration();
+
+ File dataDir = tempFolder.newFolder();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+
+ dfs = hdfsCluster.getFileSystem();
+
+ outPath = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/string-non-rolling-out";
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+ }
+
+
+ @Override
+ public void testProgram(StreamExecutionEnvironment env) {
+ assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+ int PARALLELISM = 6;
+
+ env.enableCheckpointing(200);
+ env.setParallelism(PARALLELISM);
+ env.disableOperatorChaining();
+
+ DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+ DataStream<String> mapped = stream
+ .map(new OnceFailingIdentityMapper(NUM_STRINGS));
+
+ RollingSink<String> sink = new RollingSink<String>(outPath)
+ .setBucketer(new NonRollingBucketer())
+ .setBatchSize(10000)
+ .setValidLengthPrefix("")
+ .setPendingPrefix("");
+
+ mapped.addSink(sink);
+
+ }
+
+ @Override
+ public void postSubmit() throws Exception {
+ // We read the files and verify that we have read all the strings. If a valid-length
+ // file exists we only read the file to that point. (This test should work with
+ // FileSystems that support truncate() and with others as well.)
+
+ Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+ // Keep a set of the message IDs that we read. The size must equal the read count and
+ // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
+ // elements twice.
+ Set<Integer> readNumbers = Sets.newHashSet();
+ int numRead = 0;
+
+ RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
+ outPath), true);
+
+ while (files.hasNext()) {
+ LocatedFileStatus file = files.next();
+
+ if (!file.getPath().toString().endsWith(".valid-length")) {
+ int validLength = (int) file.getLen();
+ if (dfs.exists(file.getPath().suffix(".valid-length"))) {
+ FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
+ String validLengthString = inStream.readUTF();
+ validLength = Integer.parseInt(validLengthString);
+ System.out.println("VALID LENGTH: " + validLength);
+ }
+ FSDataInputStream inStream = dfs.open(file.getPath());
+ byte[] buffer = new byte[validLength];
+ inStream.readFully(0, buffer, 0, validLength);
+ inStream.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+
+ InputStreamReader inStreamReader = new InputStreamReader(bais);
+ BufferedReader br = new BufferedReader(inStreamReader);
+
+ String line = br.readLine();
+ while (line != null) {
+ Matcher matcher = messageRegex.matcher(line);
+ if (matcher.matches()) {
+ numRead++;
+ int messageId = Integer.parseInt(matcher.group(1));
+ readNumbers.add(messageId);
+ } else {
+ Assert.fail("Read line does not match expected pattern.");
+ }
+ line = br.readLine();
+ }
+ br.close();
+ inStreamReader.close();
+ bais.close();
+ }
+ }
+
+ // Verify that we read all strings (at-least-once)
+ Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+ // Verify that we don't have duplicates (boom!, exactly-once)
+ Assert.assertEquals(NUM_STRINGS, numRead);
+ }
+
+ private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ private static volatile boolean hasFailed = false;
+
+ private final long numElements;
+
+ private long failurePos;
+ private long count;
+
+
+ OnceFailingIdentityMapper(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
+ long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+ long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+ failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+ count = 0;
+ }
+
+ @Override
+ public String map(String value) throws Exception {
+ count++;
+ if (!hasFailed && count >= failurePos) {
+ hasFailed = true;
+ throw new Exception("Test Failure");
+ }
+
+ return value;
+ }
+ }
+
+ private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
+ implements CheckpointedAsynchronously<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long numElements;
+
+ private int index;
+
+ private volatile boolean isRunning = true;
+
+
+ StringGeneratingSourceFunction(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ final Object lockingObject = ctx.getCheckpointLock();
+
+ final int step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ while (isRunning && index < numElements) {
+
+ Thread.sleep(1);
+ synchronized (lockingObject) {
+ ctx.collect("message " + index);
+ index += step;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ private static String randomString(StringBuilder bld, Random rnd) {
+ final int len = rnd.nextInt(10) + 5;
+
+ for (int i = 0; i < len; i++) {
+ char next = (char) (rnd.nextInt(20000) + 33);
+ bld.append(next);
+ }
+
+ return bld.toString();
+ }
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
new file mode 100644
index 0000000..9770f41
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -0,0 +1,506 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.taskmanager.MultiShotLatch;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * Tests for {@link RollingSink}. These
+ * tests test the different output methods as well as the rolling feature using a manual clock
+ * that increases time in lockstep with element computation using latches.
+ *
+ * <p>
+ * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
+ * exactly once behaviour.
+ */
+public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster hdfsCluster;
+ private static org.apache.hadoop.fs.FileSystem dfs;
+ private static String hdfsURI;
+
+
+ @BeforeClass
+ public static void createHDFS() throws IOException {
+ Configuration conf = new Configuration();
+
+ File dataDir = tempFolder.newFolder();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+
+ dfs = hdfsCluster.getFileSystem();
+
+ hdfsURI = "hdfs://"
+ + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+ + "/";
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ hdfsCluster.shutdown();
+ }
+
+ /**
+ * This tests {@link StringWriter} with
+ * non-rolling output.
+ */
+ @Test
+ public void testNonRollingStringWriter() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final int PARALLELISM = 2;
+ final String outPath = hdfsURI + "/string-non-rolling-out";
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ .broadcast()
+ .filter(new OddEvenFilter());
+
+ RollingSink<String> sink = new RollingSink<String>(outPath)
+ .setBucketer(new NonRollingBucketer())
+ .setPartPrefix("part")
+ .setPendingPrefix("")
+ .setPendingSuffix("");
+
+ source
+ .map(new MapFunction<Tuple2<Integer,String>, String>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public String map(Tuple2<Integer, String> value) throws Exception {
+ return value.f1;
+ }
+ })
+ .addSink(sink);
+
+ env.execute("RollingSink String Write Test");
+
+ FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+
+ inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+ br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+ }
+
+ /**
+ * This tests {@link SequenceFileWriter}
+ * with non-rolling output and without compression.
+ */
+ @Test
+ public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final int PARALLELISM = 2;
+ final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ .broadcast()
+ .filter(new OddEvenFilter());
+
+ DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+ return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+ }
+ });
+
+
+ RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+ .setWriter(new SequenceFileWriter<IntWritable, Text>())
+ .setBucketer(new NonRollingBucketer())
+ .setPartPrefix("part")
+ .setPendingPrefix("")
+ .setPendingSuffix("");
+
+ mapped.addSink(sink);
+
+ env.execute("RollingSink String Write Test");
+
+ FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+ 1000,
+ 0,
+ 100000,
+ new Configuration());
+
+ IntWritable intWritable = new IntWritable();
+ Text txt = new Text();
+
+ for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ reader.next(intWritable, txt);
+ Assert.assertEquals(i, intWritable.get());
+ Assert.assertEquals("message #" + i, txt.toString());
+ }
+
+ reader.close();
+ inStream.close();
+
+ inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+ reader = new SequenceFile.Reader(inStream,
+ 1000,
+ 0,
+ 100000,
+ new Configuration());
+
+ for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ reader.next(intWritable, txt);
+ Assert.assertEquals(i, intWritable.get());
+ Assert.assertEquals("message #" + i, txt.toString());
+ }
+
+ reader.close();
+ inStream.close();
+ }
+
+ /**
+ * This tests {@link SequenceFileWriter}
+ * with non-rolling output but with compression.
+ */
+ @Test
+ public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final int PARALLELISM = 2;
+ final String outPath = hdfsURI + "/seq-non-rolling-out";
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
+ .broadcast()
+ .filter(new OddEvenFilter());
+
+ DataStream<Tuple2<IntWritable, Text>> mapped = source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
+ return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
+ }
+ });
+
+
+ RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
+ .setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
+ .setBucketer(new NonRollingBucketer())
+ .setPartPrefix("part")
+ .setPendingPrefix("")
+ .setPendingSuffix("");
+
+ mapped.addSink(sink);
+
+ env.execute("RollingSink String Write Test");
+
+ FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
+ 1000,
+ 0,
+ 100000,
+ new Configuration());
+
+ IntWritable intWritable = new IntWritable();
+ Text txt = new Text();
+
+ for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+ reader.next(intWritable, txt);
+ Assert.assertEquals(i, intWritable.get());
+ Assert.assertEquals("message #" + i, txt.toString());
+ }
+
+ reader.close();
+ inStream.close();
+
+ inStream = dfs.open(new Path(outPath + "/part-1-0"));
+
+ reader = new SequenceFile.Reader(inStream,
+ 1000,
+ 0,
+ 100000,
+ new Configuration());
+
+ for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+ reader.next(intWritable, txt);
+ Assert.assertEquals(i, intWritable.get());
+ Assert.assertEquals("message #" + i, txt.toString());
+ }
+
+ reader.close();
+ inStream.close();
+ }
+
+ // we use this to synchronize the clock changes to elements being processed
+ final static MultiShotLatch latch1 = new MultiShotLatch();
+ final static MultiShotLatch latch2 = new MultiShotLatch();
+
+ /**
+ * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
+ * produce rolling files. The clock of DateTimeBucketer is set to
+ * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
+ * latches.
+ */
+ @Test
+ public void testDateTimeRollingStringWriter() throws Exception {
+ final int NUM_ELEMENTS = 20;
+ final int PARALLELISM = 2;
+ final String outPath = hdfsURI + "/rolling-out";
+ DateTimeBucketer.setClock(new ModifyableClock());
+ ModifyableClock.setCurrentTime(0);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+
+
+ DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
+ NUM_ELEMENTS))
+ .broadcast();
+
+ // the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
+ // fire the latch
+ DataStream<String> mapped = source
+ .flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
+ private static final long serialVersionUID = 1L;
+
+ int count = 0;
+ @Override
+ public void flatMap(Tuple2<Integer, String> value,
+ Collector<String> out) throws Exception {
+ out.collect(value.f1);
+ count++;
+ if (count >= 5) {
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+ latch1.trigger();
+ } else {
+ latch2.trigger();
+ }
+ count = 0;
+ }
+ }
+
+ });
+
+ RollingSink<String> sink = new RollingSink<String>(outPath)
+ .setBucketer(new DateTimeBucketer("ss"))
+ .setPartPrefix("part")
+ .setPendingPrefix("")
+ .setPendingSuffix("");
+
+ mapped.addSink(sink);
+
+ env.execute("RollingSink String Write Test");
+
+ RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
+
+ // we should have 8 rolling files, 4 time intervals and parallelism of 2
+ int numFiles = 0;
+ while (files.hasNext()) {
+ LocatedFileStatus file = files.next();
+ numFiles++;
+ if (file.getPath().toString().contains("rolling-out/00")) {
+ FSDataInputStream inStream = dfs.open(file.getPath());
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 0; i < 5; i++) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+ } else if (file.getPath().toString().contains("rolling-out/05")) {
+ FSDataInputStream inStream = dfs.open(file.getPath());
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 5; i < 10; i++) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+ } else if (file.getPath().toString().contains("rolling-out/10")) {
+ FSDataInputStream inStream = dfs.open(file.getPath());
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 10; i < 15; i++) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+ } else if (file.getPath().toString().contains("rolling-out/15")) {
+ FSDataInputStream inStream = dfs.open(file.getPath());
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
+
+ for (int i = 15; i < 20; i++) {
+ String line = br.readLine();
+ Assert.assertEquals("message #" + i, line);
+ }
+
+ inStream.close();
+ } else {
+ Assert.fail("File " + file + " does not match any expected roll pattern.");
+ }
+ }
+
+ Assert.assertEquals(8, numFiles);
+ }
+
+
+ private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ private final int numElements;
+
+ public TestSourceFunction(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+ for (int i = 0; i < numElements && running; i++) {
+ ctx.collect(Tuple2.of(i, "message #" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ /**
+ * This waits on the two multi-shot latches. The latches are triggered in a parallel
+ * flatMap inside the test topology.
+ */
+ private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ private final int numElements;
+
+ public WaitingTestSourceFunction(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
+ for (int i = 0; i < numElements && running; i++) {
+ if (i % 5 == 0 && i > 0) {
+ // update the clock after "five seconds", so we get 20 seconds in total
+ // with 5 elements in each time window
+ latch1.await();
+ latch2.await();
+ ModifyableClock.setCurrentTime(i * 1000);
+ }
+ ctx.collect(Tuple2.of(i, "message #" + i));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
+
+ public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple2<Integer, String> value) throws Exception {
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+ return value.f0 % 2 == 0;
+ } else {
+ return value.f0 % 2 == 1;
+ }
+ }
+ }
+
+ public static class ModifyableClock implements Clock {
+
+ private static volatile long currentTime = 0;
+
+ public static void setCurrentTime(long currentTime) {
+ ModifyableClock.currentTime = currentTime;
+ }
+
+ @Override
+ public long currentTimeMillis() {
+ return currentTime;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fe60d94
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-streaming-connectors/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..912a7e4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-flume</artifactId>
+ <name>flink-connector-flume</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <flume-ng.version>1.5.0</flume-ng.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume-ng.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.tukaani</groupId>
+ <artifactId>xz</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.velocity</groupId>
+ <artifactId>velocity</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <!-- We include all dependencies that transitively depend on guava -->
+ <include>org.apache.flume:*</include>
+ </includes>
+ </artifactSet>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..50f5770
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+ private transient FlinkRpcClientFacade client;
+ boolean initDone = false;
+ String host;
+ int port;
+ SerializationSchema<IN, byte[]> schema;
+
+ public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
+ this.host = host;
+ this.port = port;
+ this.schema = schema;
+ }
+
+ /**
+ * Receives tuples from the Apache Flink {@link DataStream} and forwards
+ * them to Apache Flume.
+ *
+ * @param value
+ * The tuple arriving from the datastream
+ */
+ @Override
+ public void invoke(IN value) {
+
+ byte[] data = schema.serialize(value);
+ client.sendDataToFlume(data);
+
+ }
+
+ private class FlinkRpcClientFacade {
+ private RpcClient client;
+ private String hostname;
+ private int port;
+
+ /**
+ * Initializes the connection to Apache Flume.
+ *
+ * @param hostname
+ * The host
+ * @param port
+ * The port.
+ */
+ public void init(String hostname, int port) {
+ // Setup the RPC connection
+ this.hostname = hostname;
+ this.port = port;
+ int initCounter = 0;
+ while (true) {
+ if (initCounter >= 90) {
+ throw new RuntimeException("Cannot establish connection with" + port + " at "
+ + host);
+ }
+ try {
+ this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+ } catch (FlumeException e) {
+ // Wait one second if the connection failed before the next
+ // try
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Interrupted while trying to connect {} at {}", port, host);
+ }
+ }
+ }
+ if (client != null) {
+ break;
+ }
+ initCounter++;
+ }
+ initDone = true;
+ }
+
+ /**
+ * Sends byte arrays as {@link Event} series to Apache Flume.
+ *
+ * @param data
+ * The byte array to send to Apache FLume
+ */
+ public void sendDataToFlume(byte[] data) {
+ Event event = EventBuilder.withBody(data);
+
+ try {
+ client.append(event);
+
+ } catch (EventDeliveryException e) {
+ // clean up and recreate the client
+ client.close();
+ client = null;
+ client = RpcClientFactory.getDefaultInstance(hostname, port);
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ client.client.close();
+ }
+
+ @Override
+ public void open(Configuration config) {
+ client = new FlinkRpcClientFacade();
+ client.init(host, port);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..8fecd0f
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,149 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import java.util.List;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+//import org.apache.flink.util.Collector;
+//import org.apache.flume.Context;
+//import org.apache.flume.channel.ChannelProcessor;
+//import org.apache.flume.source.AvroSource;
+//import org.apache.flume.source.avro.AvroFlumeEvent;
+//import org.apache.flume.source.avro.Status;
+//
+//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
+// private static final long serialVersionUID = 1L;
+//
+// String host;
+// String port;
+// volatile boolean finished = false;
+//
+// private volatile boolean isRunning = false;
+//
+// FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+// super(deserializationSchema);
+// this.host = host;
+// this.port = Integer.toString(port);
+// }
+//
+// public class MyAvroSource extends AvroSource {
+// Collector<OUT> output;
+//
+// /**
+// * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+// * {@link DataStream}.
+// *
+// * @param avroEvent
+// * The event that should be sent to the dataStream
+// * @return A {@link Status}.OK message if sending the event was
+// * successful.
+// */
+// @Override
+// public Status append(AvroFlumeEvent avroEvent) {
+// collect(avroEvent);
+// return Status.OK;
+// }
+//
+// /**
+// * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+// * {@link DataStream}.
+// *
+// * @param events
+// * The events that is sent to the dataStream
+// * @return A Status.OK message if sending the events was successful.
+// */
+// @Override
+// public Status appendBatch(List<AvroFlumeEvent> events) {
+// for (AvroFlumeEvent avroEvent : events) {
+// collect(avroEvent);
+// }
+//
+// return Status.OK;
+// }
+//
+// /**
+// * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+// * {@link DataStream}.
+// *
+// * @param avroEvent
+// * The event that is sent to the dataStream
+// */
+// private void collect(AvroFlumeEvent avroEvent) {
+// byte[] b = avroEvent.getBody().array();
+// OUT out = FlumeSource.this.schema.deserialize(b);
+//
+// if (schema.isEndOfStream(out)) {
+// FlumeSource.this.finished = true;
+// this.stop();
+// FlumeSource.this.notifyAll();
+// } else {
+// output.collect(out);
+// }
+//
+// }
+//
+// }
+//
+// MyAvroSource avroSource;
+//
+// /**
+// * Configures the AvroSource. Also sets the output so the application can
+// * use it from outside of the invoke function.
+// *
+// * @param output
+// * The output used in the invoke function
+// */
+// public void configureAvroSource(Collector<OUT> output) {
+//
+// avroSource = new MyAvroSource();
+// avroSource.output = output;
+// Context context = new Context();
+// context.put("port", port);
+// context.put("bind", host);
+// avroSource.configure(context);
+// // An instance of a ChannelProcessor is required for configuring the
+// // avroSource although it will not be used in this case.
+// ChannelProcessor cp = new ChannelProcessor(null);
+// avroSource.setChannelProcessor(cp);
+// }
+//
+// /**
+// * Configures the AvroSource and runs until the user calls a close function.
+// *
+// * @param output
+// * The Collector for sending data to the datastream
+// */
+// @Override
+// public void run(Collector<OUT> output) throws Exception {
+// isRunning = true;
+// configureAvroSource(output);
+// avroSource.start();
+// while (!finished && isRunning) {
+// this.wait();
+// }
+// }
+//
+// @Override
+// public void cancel() {
+// isRunning = false;
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..45da6eb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,49 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.util.serialization.SerializationSchema;
+//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+//
+//public class FlumeTopology {
+//
+// public static void main(String[] args) throws Exception {
+//
+// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+//
+// @SuppressWarnings("unused")
+// DataStream<String> inputStream1 = env.addSource(
+// new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
+// new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
+//
+// env.execute();
+// }
+//
+// public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+//
+// private static final long serialVersionUID = 1L;
+//
+// @Override
+// public byte[] serialize(String element) {
+// return element.getBytes();
+// }
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-streaming-connectors/flink-connector-kafka/pom.xml
new file mode 100644
index 0000000..57a9a56
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-kafka</artifactId>
+ <name>flink-connector-kafka</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <kafka.version>0.8.2.0</kafka.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-annotation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+ <forkCount>1</forkCount>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
new file mode 100644
index 0000000..8066b3c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -0,0 +1,689 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.collections.map.LinkedMap;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once".
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ *
+ * <p>To support a variety of Kafka brokers, protocol versions, and offset committing approaches,
+ * the Flink Kafka Consumer can be parametrized with a <i>fetcher</i> and an <i>offset handler</i>.</p>
+ *
+ * <h1>Fetcher</h1>
+ *
+ * <p>The fetcher is responsible to pull data from Kafka. Because Kafka has undergone a change in
+ * protocols and APIs, there are currently two fetchers available:</p>
+ *
+ * <ul>
+ * <li>{@link FetcherType#NEW_HIGH_LEVEL}: A fetcher based on the new Kafka consumer API.
+ * This fetcher is generally more robust, but works only with later versions of
+ * Kafka (> 0.8.2).</li>
+ *
+ * <li>{@link FetcherType#LEGACY_LOW_LEVEL}: A fetcher based on the old low-level consumer API.
+ * This fetcher is works also with older versions of Kafka (0.8.1). The fetcher interprets
+ * the old Kafka consumer properties, like:
+ * <ul>
+ * <li>socket.timeout.ms</li>
+ * <li>socket.receive.buffer.bytes</li>
+ * <li>fetch.message.max.bytes</li>
+ * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
+ * <li>fetch.wait.max.ms</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
+ * <h1>Offset handler</h1>
+ *
+ * <p>Offsets whose records have been read and are checkpointed will be committed back to Kafka / ZooKeeper
+ * by the offset handler. In addition, the offset handler finds the point where the source initially
+ * starts reading from the stream, when the streaming job is started.</p>
+ *
+ * <p>Currently, the source offers two different offset handlers exist:</p>
+ * <ul>
+ * <li>{@link OffsetStore#KAFKA}: Use this offset handler when the Kafka brokers are managing the offsets,
+ * and hence offsets need to be committed the Kafka brokers, rather than to ZooKeeper.
+ * Note that this offset handler works only on new versions of Kafka (0.8.2.x +) and
+ * with the {@link FetcherType#NEW_HIGH_LEVEL} fetcher.</li>
+ *
+ * <li>{@link OffsetStore#FLINK_ZOOKEEPER}: Use this offset handler when the offsets are managed
+ * by ZooKeeper, as in older versions of Kafka (0.8.1.x)</li>
+ * </ul>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
+ * is constructed. That means that the client that submits the program needs to be able to
+ * reach the Kafka brokers or ZooKeeper.</p>
+ */
+public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
+ implements CheckpointNotifier, CheckpointedAsynchronously<long[]>, ResultTypeQueryable<T> {
+
+ /**
+ * The offset store defines how acknowledged offsets are committed back to Kafka. Different
+ * options include letting Flink periodically commit to ZooKeeper, or letting Kafka manage the
+ * offsets (new Kafka versions only).
+ */
+ public enum OffsetStore {
+
+ /**
+ * Let Flink manage the offsets. Flink will periodically commit them to Zookeeper (usually after
+ * successful checkpoints), in the same structure as Kafka 0.8.2.x
+ *
+ * <p>Use this mode when using the source with Kafka 0.8.1.x brokers.</p>
+ */
+ FLINK_ZOOKEEPER,
+
+ /**
+ * Use the mechanisms in Kafka to commit offsets. Depending on the Kafka configuration, different
+ * mechanism will be used (broker coordinator, zookeeper)
+ */
+ KAFKA
+ }
+
+ /**
+ * The fetcher type defines which code paths to use to pull data from teh Kafka broker.
+ */
+ public enum FetcherType {
+
+ /**
+ * The legacy fetcher uses Kafka's old low-level consumer API.
+ *
+ * <p>Use this fetcher for Kafka 0.8.1 brokers.</p>
+ */
+ LEGACY_LOW_LEVEL,
+
+ /**
+ * This fetcher uses a backport of the new consumer API to pull data from the Kafka broker.
+ * It is the fetcher that will be maintained in the future, and it already
+ * handles certain failure cases with less overhead than the legacy fetcher.
+ *
+ * <p>This fetcher works only Kafka 0.8.2 and 0.8.3 (and future versions).</p>
+ */
+ NEW_HIGH_LEVEL
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final long serialVersionUID = -6272159445203409112L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
+
+ /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid),
+ * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */
+ public static final long OFFSET_NOT_SET = -915623761776L;
+
+ /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */
+ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+ /** Configuration key for the number of retries for getting the partition info */
+ public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
+
+ /** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
+ public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
+
+
+
+ // ------ Configuration of the Consumer -------
+
+ /** The offset store where this consumer commits safe offsets */
+ private final OffsetStore offsetStore;
+
+ /** The type of fetcher to be used to pull data from Kafka */
+ private final FetcherType fetcherType;
+
+ /** name of the topic consumed by this source */
+ private final String topic;
+
+ /** The properties to parametrize the Kafka consumer and ZooKeeper client */
+ private final Properties props;
+
+ /** The ids of the partitions that are read by this consumer */
+ private final int[] partitions;
+
+ /** The schema to convert between Kafka#s byte messages, and Flink's objects */
+ private final DeserializationSchema<T> valueDeserializer;
+
+ // ------ Runtime State -------
+
+ /** Data for pending but uncommitted checkpoints */
+ private final LinkedMap pendingCheckpoints = new LinkedMap();
+
+ /** The fetcher used to pull data from the Kafka brokers */
+ private transient Fetcher fetcher;
+
+ /** The committer that persists the committed offsets */
+ private transient OffsetHandler offsetHandler;
+
+ /** The partitions actually handled by this consumer */
+ private transient List<TopicPartition> subscribedPartitions;
+
+ /** The offsets of the last returned elements */
+ private transient long[] lastOffsets;
+
+ /** The latest offsets that have been committed to Kafka or ZooKeeper. These are never
+ * newer then the last offsets (Flink's internal view is fresher) */
+ private transient long[] commitedOffsets;
+
+ /** The offsets to restore to, if the consumer restores state from a checkpoint */
+ private transient long[] restoreToOffset;
+
+ private volatile boolean running = true;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
+ *
+ * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
+ * at the beginnign of this class.</p>
+ *
+ * @param topic
+ * The Kafka topic to read from.
+ * @param valueDeserializer
+ * The deserializer to turn raw byte messages into Java/Scala objects.
+ * @param props
+ * The properties that are used to configure both the fetcher and the offset handler.
+ * @param offsetStore
+ * The type of offset store to use (Kafka / ZooKeeper)
+ * @param fetcherType
+ * The type of fetcher to use (new high-level API, old low-level API).
+ */
+ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props,
+ OffsetStore offsetStore, FetcherType fetcherType) {
+ this.offsetStore = checkNotNull(offsetStore);
+ this.fetcherType = checkNotNull(fetcherType);
+
+ if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
+ throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
+ "supported in Flink");
+ }
+ if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
+ throw new IllegalArgumentException(
+ "The Kafka offset handler cannot be used together with the old low-level fetcher.");
+ }
+
+ this.topic = checkNotNull(topic, "topic");
+ this.props = checkNotNull(props, "props");
+ this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
+
+ // validate the zookeeper properties
+ if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
+ validateZooKeeperConfig(props);
+ }
+
+ // Connect to a broker to get the partitions
+ List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
+
+ // get initial partitions list. The order of the partitions is important for consistent
+ // partition id assignment in restart cases.
+ this.partitions = new int[partitionInfos.size()];
+ for (int i = 0; i < partitionInfos.size(); i++) {
+ partitions[i] = partitionInfos.get(i).partition();
+
+ if (partitions[i] >= partitions.length) {
+ throw new RuntimeException("Kafka partition numbers are sparse");
+ }
+ }
+ LOG.info("Topic {} has {} partitions", topic, partitions.length);
+
+ // make sure that we take care of the committing
+ props.setProperty("enable.auto.commit", "false");
+ }
+
+ // ------------------------------------------------------------------------
+ // Source life cycle
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+ // pick which partitions we work on
+ subscribedPartitions = assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Kafka consumer {} will read partitions {} out of partitions {}",
+ thisComsumerIndex, subscribedPartitions, Arrays.toString(partitions));
+ }
+
+ // we leave the fetcher as null, if we have no partitions
+ if (subscribedPartitions.isEmpty()) {
+ LOG.info("Kafka consumer {} has no partitions (empty source)", thisComsumerIndex);
+ return;
+ }
+
+ // create fetcher
+ switch (fetcherType){
+ case NEW_HIGH_LEVEL:
+ throw new UnsupportedOperationException("Currently unsupported");
+ case LEGACY_LOW_LEVEL:
+ fetcher = new LegacyFetcher(topic, props, getRuntimeContext().getTaskName());
+ break;
+ default:
+ throw new RuntimeException("Requested unknown fetcher " + fetcher);
+ }
+ fetcher.setPartitionsToRead(subscribedPartitions);
+
+ // offset handling
+ switch (offsetStore){
+ case FLINK_ZOOKEEPER:
+ offsetHandler = new ZookeeperOffsetHandler(props);
+ break;
+ case KAFKA:
+ throw new Exception("Kafka offset handler cannot work with legacy fetcher");
+ default:
+ throw new RuntimeException("Requested unknown offset store " + offsetStore);
+ }
+
+ // set up operator state
+ lastOffsets = new long[partitions.length];
+ commitedOffsets = new long[partitions.length];
+
+ Arrays.fill(lastOffsets, OFFSET_NOT_SET);
+ Arrays.fill(commitedOffsets, OFFSET_NOT_SET);
+
+ // seek to last known pos, from restore request
+ if (restoreToOffset != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Consumer {} found offsets from previous checkpoint: {}",
+ thisComsumerIndex, Arrays.toString(restoreToOffset));
+ }
+
+ for (int i = 0; i < restoreToOffset.length; i++) {
+ long restoredOffset = restoreToOffset[i];
+ if (restoredOffset != OFFSET_NOT_SET) {
+ // if this fails because we are not subscribed to the topic, then the
+ // partition assignment is not deterministic!
+
+ // we set the offset +1 here, because seek() is accepting the next offset to read,
+ // but the restore offset is the last read offset
+ fetcher.seek(new TopicPartition(topic, i), restoredOffset + 1);
+ lastOffsets[i] = restoredOffset;
+ }
+ }
+ }
+ else {
+ // no restore request. Let the offset handler take care of the initial offset seeking
+ offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
+ }
+ }
+
+ @Override
+ public void run(SourceContext<T> sourceContext) throws Exception {
+ if (fetcher != null) {
+ fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+ }
+ else {
+ // this source never completes
+ final Object waitLock = new Object();
+ while (running) {
+ // wait until we are canceled
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (waitLock) {
+ waitLock.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ // do nothing, check our "running" status
+ }
+ }
+ }
+
+ // close the context after the work was done. this can actually only
+ // happen when the fetcher decides to stop fetching
+ sourceContext.close();
+ }
+
+ @Override
+ public void cancel() {
+ // set ourselves as not running
+ running = false;
+
+ // close the fetcher to interrupt any work
+ Fetcher fetcher = this.fetcher;
+ this.fetcher = null;
+ if (fetcher != null) {
+ try {
+ fetcher.close();
+ }
+ catch (IOException e) {
+ LOG.warn("Error while closing Kafka connector data fetcher", e);
+ }
+ }
+
+ OffsetHandler offsetHandler = this.offsetHandler;
+ this.offsetHandler = null;
+ if (offsetHandler != null) {
+ try {
+ offsetHandler.close();
+ }
+ catch (IOException e) {
+ LOG.warn("Error while closing Kafka connector offset handler", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+ super.close();
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return valueDeserializer.getProducedType();
+ }
+
+ // ------------------------------------------------------------------------
+ // Checkpoint and restore
+ // ------------------------------------------------------------------------
+
+ @Override
+ public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ if (lastOffsets == null) {
+ LOG.debug("snapshotState() requested on not yet opened source; returning null.");
+ return null;
+ }
+ if (!running) {
+ LOG.debug("snapshotState() called on closed source");
+ return null;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}",
+ Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+ }
+
+ long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+
+ // the map cannot be asynchronously updated, because only one checkpoint call can happen
+ // on this function at a time: either snapshotState() or notifyCheckpointComplete()
+ pendingCheckpoints.put(checkpointId, currentOffsets);
+
+ while (pendingCheckpoints.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+ pendingCheckpoints.remove(0);
+ }
+
+ return currentOffsets;
+ }
+
+ @Override
+ public void restoreState(long[] restoredOffsets) {
+ restoreToOffset = restoredOffsets;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (fetcher == null) {
+ LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+ return;
+ }
+ if (!running) {
+ LOG.debug("notifyCheckpointComplete() called on closed source");
+ return;
+ }
+
+ // only one commit operation must be in progress
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
+ }
+
+ try {
+ long[] checkpointOffsets;
+
+ // the map may be asynchronously updates when snapshotting state, so we synchronize
+ synchronized (pendingCheckpoints) {
+ final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ return;
+ }
+
+ checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+
+ // remove older checkpoints in map
+ for (int i = 0; i < posInMap; i++) {
+ pendingCheckpoints.remove(0);
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
+ }
+
+ // build the map of (topic,partition) -> committed offset
+ Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
+ for (TopicPartition tp : subscribedPartitions) {
+
+ int partition = tp.partition();
+ long offset = checkpointOffsets[partition];
+ long lastCommitted = commitedOffsets[partition];
+
+ if (offset != OFFSET_NOT_SET) {
+ if (offset > lastCommitted) {
+ offsetsToCommit.put(tp, offset);
+ LOG.debug("Committing offset {} for partition {}", offset, partition);
+ }
+ else {
+ LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+ }
+ }
+ }
+
+ offsetHandler.commit(offsetsToCommit);
+ }
+ catch (Exception e) {
+ if (running) {
+ throw e;
+ }
+ // else ignore exception if we are no longer running
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Miscellaneous utilities
+ // ------------------------------------------------------------------------
+
+ protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName,
+ int numConsumers, int consumerIndex) {
+ checkArgument(numConsumers > 0);
+ checkArgument(consumerIndex < numConsumers);
+
+ List<TopicPartition> partitionsToSub = new ArrayList<>();
+
+ for (int i = 0; i < partitions.length; i++) {
+ if (i % numConsumers == consumerIndex) {
+ partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
+ }
+ }
+ return partitionsToSub;
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka / ZooKeeper communication utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * Send request to Kafka to get partitions for topic.
+ *
+ * @param topic The name of the topic.
+ * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
+ */
+ public static List<PartitionInfo> getPartitionsForTopic(final String topic, final Properties properties) {
+ String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ final int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(DEFAULT_GET_PARTITIONS_RETRIES)));
+
+ checkNotNull(seedBrokersConfString, "Configuration property " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " not set");
+ String[] seedBrokers = seedBrokersConfString.split(",");
+ List<PartitionInfo> partitions = new ArrayList<>();
+
+ Random rnd = new Random();
+ retryLoop: for(int retry = 0; retry < numRetries; retry++) {
+ // we pick a seed broker randomly to avoid overloading the first broker with all the requests when the
+ // parallel source instances start. Still, we try all available brokers.
+ int index = rnd.nextInt(seedBrokers.length);
+ brokersLoop: for (int arrIdx = 0; arrIdx < seedBrokers.length; arrIdx++) {
+ String seedBroker = seedBrokers[index];
+ LOG.info("Trying to get topic metadata from broker {} in try {}/{}", seedBroker, retry, numRetries);
+ if (++index == seedBrokers.length) {
+ index = 0;
+ }
+
+ URL brokerUrl = NetUtils.getCorrectHostnamePort(seedBroker);
+ SimpleConsumer consumer = null;
+ try {
+ final String clientId = "flink-kafka-consumer-partition-lookup";
+ final int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
+ final int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
+ consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, clientId);
+
+ List<String> topics = Collections.singletonList(topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+ List<TopicMetadata> metaData = resp.topicsMetadata();
+
+ // clear in case we have an incomplete list from previous tries
+ partitions.clear();
+ for (TopicMetadata item : metaData) {
+ if (item.errorCode() != ErrorMapping.NoError()) {
+ if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
+ // fail hard if topic is unknown
+ throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor(item.errorCode()));
+ }
+ // warn and try more brokers
+ LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic,
+ ErrorMapping.exceptionFor(item.errorCode()));
+ continue brokersLoop;
+ }
+ if (!item.topic().equals(topic)) {
+ LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
+ continue brokersLoop;
+ }
+ for (PartitionMetadata part : item.partitionsMetadata()) {
+ Node leader = brokerToNode(part.leader());
+ Node[] replicas = new Node[part.replicas().size()];
+ for (int i = 0; i < part.replicas().size(); i++) {
+ replicas[i] = brokerToNode(part.replicas().get(i));
+ }
+
+ Node[] ISRs = new Node[part.isr().size()];
+ for (int i = 0; i < part.isr().size(); i++) {
+ ISRs[i] = brokerToNode(part.isr().get(i));
+ }
+ PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
+ partitions.add(pInfo);
+ }
+ }
+ break retryLoop; // leave the loop through the brokers
+ } catch (Exception e) {
+ LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topic, e);
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ } // brokers loop
+ } // retries loop
+ return partitions;
+ }
+
+ private static Node brokerToNode(Broker broker) {
+ return new Node(broker.id(), broker.host(), broker.port());
+ }
+
+ protected static void validateZooKeeperConfig(Properties props) {
+ if (props.getProperty("zookeeper.connect") == null) {
+ throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
+ }
+ if (props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+ throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
+ + "' has not been set in the properties");
+ }
+
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
+ }
+
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
new file mode 100644
index 0000000..21f24e6
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.1.x brokers.
+ * The consumer will internally use the old low-level Kafka API, and manually commit offsets
+ * partition offsets to ZooKeeper.
+ *
+ * <p>The following additional configuration values are available:</p>
+ * <ul>
+ * <li>socket.timeout.ms</li>
+ * <li>socket.receive.buffer.bytes</li>
+ * <li>fetch.message.max.bytes</li>
+ * <li>auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior)</li>
+ * <li>fetch.wait.max.ms</li>
+ * </ul>
+ *
+ * @param <T> The type of elements produced by this consumer.
+ */
+public class FlinkKafkaConsumer081<T> extends FlinkKafkaConsumer<T> {
+
+ private static final long serialVersionUID = -5649906773771949146L;
+
+ /**
+ * Creates a new Kafka 0.8.1.x streaming source consumer.
+ *
+ * @param topic
+ * The name of the topic that should be consumed.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ public FlinkKafkaConsumer081(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+ super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
new file mode 100644
index 0000000..77e41e5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.util.Properties;
+
+/**
+ * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
+ * The consumer will internally use the old low-level Kafka API, and manually commit offsets
+ * partition offsets to ZooKeeper.
+ *
+ * Once Kafka released the new consumer with Kafka 0.8.3 Flink might use the 0.8.3 consumer API
+ * also against Kafka 0.8.2 installations.
+ *
+ * @param <T> The type of elements produced by this consumer.
+ */
+public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
+
+ private static final long serialVersionUID = -8450689820627198228L;
+
+ /**
+ * Creates a new Kafka 0.8.2.x streaming source consumer.
+ *
+ * @param topic
+ * The name of the topic that should be consumed.
+ * @param valueDeserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+ super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+ }
+}