You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:05 UTC

[49/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
deleted file mode 100644
index 7d127ff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
-* Tests for {@link RollingSink}.
-*
-* <p>
-* This test only verifies the exactly once behaviour of the sink. Another test tests the
-* rolling behaviour.
-*
-* <p>
-* This differs from RollingSinkFaultToleranceITCase in that the checkpoint interval is extremely
-* high. This provokes the case that the sink restarts without any checkpoint having been performed.
-* This tests the initial cleanup of pending/in-progress files.
-*/
-public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase {
-
-	final long NUM_STRINGS = 16_000;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-
-	private static String outPath;
-
-
-
-	@BeforeClass
-	public static void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
-
-		File dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		outPath = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(),  hdfsCluster.getNameNodePort())
-				+ "/string-non-rolling-out-no-checkpoint";
-	}
-
-	@AfterClass
-	public static void destroyHDFS() {
-		if (hdfsCluster != null) {
-			hdfsCluster.shutdown();
-		}
-	}
-
-
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		int PARALLELISM = 6;
-
-		env.enableCheckpointing(Long.MAX_VALUE);
-		env.setParallelism(PARALLELISM);
-		env.disableOperatorChaining();
-
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
-		DataStream<String> mapped = stream
-				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new NonRollingBucketer())
-				.setBatchSize(5000)
-				.setValidLengthPrefix("")
-				.setPendingPrefix("");
-
-		mapped.addSink(sink);
-
-	}
-
-	@Override
-	public void postSubmit() throws Exception {
-		// We read the files and verify that we have read all the strings. If a valid-length
-		// file exists we only read the file to that point. (This test should work with
-		// FileSystems that support truncate() and with others as well.)
-
-		Pattern messageRegex = Pattern.compile("message (\\d*)");
-
-		// Keep a set of the message IDs that we read. The size must equal the read count and
-		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
-		// elements twice.
-		Set<Integer> readNumbers = Sets.newHashSet();
-		int numRead = 0;
-
-		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
-				outPath), true);
-
-		while (files.hasNext()) {
-			LocatedFileStatus file = files.next();
-
-			if (!file.getPath().toString().endsWith(".valid-length")) {
-				int validLength = (int) file.getLen();
-				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
-					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
-					String validLengthString = inStream.readUTF();
-					validLength = Integer.parseInt(validLengthString);
-					System.out.println("VALID LENGTH: " + validLength);
-				}
-				FSDataInputStream inStream = dfs.open(file.getPath());
-				byte[] buffer = new byte[validLength];
-				inStream.readFully(0, buffer, 0, validLength);
-				inStream.close();
-
-				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-
-				InputStreamReader inStreamReader = new InputStreamReader(bais);
-				BufferedReader br = new BufferedReader(inStreamReader);
-
-				String line = br.readLine();
-				while (line != null) {
-					Matcher matcher = messageRegex.matcher(line);
-					if (matcher.matches()) {
-						numRead++;
-						int messageId = Integer.parseInt(matcher.group(1));
-						readNumbers.add(messageId);
-					} else {
-						Assert.fail("Read line does not match expected pattern.");
-					}
-					line = br.readLine();
-				}
-				br.close();
-				inStreamReader.close();
-				bais.close();
-			}
-		}
-
-		// Verify that we read all strings (at-least-once)
-		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
-		// Verify that we don't have duplicates (boom!, exactly-once)
-		Assert.assertEquals(NUM_STRINGS, numRead);
-	}
-
-	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		private static volatile boolean hasFailed = false;
-
-		private final long numElements;
-
-		private long failurePos;
-		private long count;
-
-
-		OnceFailingIdentityMapper(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
-			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-			count = 0;
-		}
-
-		@Override
-		public String map(String value) throws Exception {
-			count++;
-			if (!hasFailed && count >= failurePos) {
-				hasFailed = true;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements CheckpointedAsynchronously<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final long numElements;
-
-		private int index;
-
-		private volatile boolean isRunning = true;
-
-
-		StringGeneratingSourceFunction(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			final Object lockingObject = ctx.getCheckpointLock();
-
-			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			while (isRunning && index < numElements) {
-
-				Thread.sleep(1);
-				synchronized (lockingObject) {
-					ctx.collect("message " + index);
-					index += step;
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		private static String randomString(StringBuilder bld, Random rnd) {
-			final int len = rnd.nextInt(10) + 5;
-
-			for (int i = 0; i < len; i++) {
-				char next = (char) (rnd.nextInt(20000) + 33);
-				bld.append(next);
-			}
-
-			return bld.toString();
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			index = state;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
deleted file mode 100644
index 65904d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
- *
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
- * rolling behaviour.
- */
-public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
-
-	final long NUM_STRINGS = 16_000;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-
-	private static String outPath;
-
-
-
-	@BeforeClass
-	public static void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
-
-		File dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		outPath = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
-				+ "/string-non-rolling-out";
-	}
-
-	@AfterClass
-	public static void destroyHDFS() {
-		if (hdfsCluster != null) {
-			hdfsCluster.shutdown();
-		}
-	}
-
-
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		int PARALLELISM = 6;
-
-		env.enableCheckpointing(200);
-		env.setParallelism(PARALLELISM);
-		env.disableOperatorChaining();
-
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
-		DataStream<String> mapped = stream
-				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new NonRollingBucketer())
-				.setBatchSize(10000)
-				.setValidLengthPrefix("")
-				.setPendingPrefix("");
-
-		mapped.addSink(sink);
-
-	}
-
-	@Override
-	public void postSubmit() throws Exception {
-		// We read the files and verify that we have read all the strings. If a valid-length
-		// file exists we only read the file to that point. (This test should work with
-		// FileSystems that support truncate() and with others as well.)
-
-		Pattern messageRegex = Pattern.compile("message (\\d*)");
-
-		// Keep a set of the message IDs that we read. The size must equal the read count and
-		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
-		// elements twice.
-		Set<Integer> readNumbers = Sets.newHashSet();
-		int numRead = 0;
-
-		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
-				outPath), true);
-
-		while (files.hasNext()) {
-			LocatedFileStatus file = files.next();
-
-			if (!file.getPath().toString().endsWith(".valid-length")) {
-				int validLength = (int) file.getLen();
-				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
-					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
-					String validLengthString = inStream.readUTF();
-					validLength = Integer.parseInt(validLengthString);
-					System.out.println("VALID LENGTH: " + validLength);
-				}
-				FSDataInputStream inStream = dfs.open(file.getPath());
-				byte[] buffer = new byte[validLength];
-				inStream.readFully(0, buffer, 0, validLength);
-				inStream.close();
-
-				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-
-				InputStreamReader inStreamReader = new InputStreamReader(bais);
-				BufferedReader br = new BufferedReader(inStreamReader);
-
-				String line = br.readLine();
-				while (line != null) {
-					Matcher matcher = messageRegex.matcher(line);
-					if (matcher.matches()) {
-						numRead++;
-						int messageId = Integer.parseInt(matcher.group(1));
-						readNumbers.add(messageId);
-					} else {
-						Assert.fail("Read line does not match expected pattern.");
-					}
-					line = br.readLine();
-				}
-				br.close();
-				inStreamReader.close();
-				bais.close();
-			}
-		}
-
-		// Verify that we read all strings (at-least-once)
-		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
-		// Verify that we don't have duplicates (boom!, exactly-once)
-		Assert.assertEquals(NUM_STRINGS, numRead);
-	}
-
-	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		private static volatile boolean hasFailed = false;
-
-		private final long numElements;
-
-		private long failurePos;
-		private long count;
-
-
-		OnceFailingIdentityMapper(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
-			long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-			count = 0;
-		}
-
-		@Override
-		public String map(String value) throws Exception {
-			count++;
-			if (!hasFailed && count >= failurePos) {
-				hasFailed = true;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements CheckpointedAsynchronously<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final long numElements;
-
-		private int index;
-
-		private volatile boolean isRunning = true;
-
-
-		StringGeneratingSourceFunction(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			final Object lockingObject = ctx.getCheckpointLock();
-
-			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			while (isRunning && index < numElements) {
-
-				Thread.sleep(1);
-				synchronized (lockingObject) {
-					ctx.collect("message " + index);
-					index += step;
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		private static String randomString(StringBuilder bld, Random rnd) {
-			final int len = rnd.nextInt(10) + 5;
-
-			for (int i = 0; i < len; i++) {
-				char next = (char) (rnd.nextInt(20000) + 33);
-				bld.append(next);
-			}
-
-			return bld.toString();
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			index = state;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
deleted file mode 100644
index 9770f41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.taskmanager.MultiShotLatch;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-/**
- * Tests for {@link RollingSink}. These
- * tests test the different output methods as well as the rolling feature using a manual clock
- * that increases time in lockstep with element computation using latches.
- *
- * <p>
- * This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
- * exactly once behaviour.
- */
-public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
-
-
-	@BeforeClass
-	public static void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
-
-		File dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		hdfsURI = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
-				+ "/";
-	}
-
-	@AfterClass
-	public static void destroyHDFS() {
-		hdfsCluster.shutdown();
-	}
-
-	/**
-	 * This tests {@link StringWriter} with
-	 * non-rolling output.
-	 */
-	@Test
-	public void testNonRollingStringWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
-		final String outPath = hdfsURI + "/string-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		source
-				.map(new MapFunction<Tuple2<Integer,String>, String>() {
-					private static final long serialVersionUID = 1L;
-					@Override
-					public String map(Tuple2<Integer, String> value) throws Exception {
-						return value.f1;
-					}
-				})
-				.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-			String line = br.readLine();
-			Assert.assertEquals("message #" + i, line);
-		}
-
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		br = new BufferedReader(new InputStreamReader(inStream));
-
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-			String line = br.readLine();
-			Assert.assertEquals("message #" + i, line);
-		}
-
-		inStream.close();
-	}
-
-	/**
-	 * This tests {@link SequenceFileWriter}
-	 * with non-rolling output and without compression.
-	 */
-	@Test
-	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
-		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
-				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
-			}
-		});
-
-
-		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
-				.setWriter(new SequenceFileWriter<IntWritable, Text>())
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		mapped.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		IntWritable intWritable = new IntWritable();
-		Text txt = new Text();
-
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-	}
-
-	/**
-	 * This tests {@link SequenceFileWriter}
-	 * with non-rolling output but with compression.
-	 */
-	@Test
-	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
-		final String outPath = hdfsURI + "/seq-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(NUM_ELEMENTS))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer,String>, Tuple2<IntWritable, Text>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
-				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
-			}
-		});
-
-
-		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
-				.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		mapped.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		IntWritable intWritable = new IntWritable();
-		Text txt = new Text();
-
-		for (int i = 0; i < NUM_ELEMENTS; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		for (int i = 1; i < NUM_ELEMENTS; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-	}
-
-	// we use this to synchronize the clock changes to elements being processed
-	final static MultiShotLatch latch1 = new MultiShotLatch();
-	final static MultiShotLatch latch2 = new MultiShotLatch();
-
-	/**
-	 * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
-	 * produce rolling files. The clock of DateTimeBucketer is set to
-	 * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
-	 * latches.
-	 */
-	@Test
-	public void testDateTimeRollingStringWriter() throws Exception {
-		final int NUM_ELEMENTS = 20;
-		final int PARALLELISM = 2;
-		final String outPath = hdfsURI + "/rolling-out";
-		DateTimeBucketer.setClock(new ModifyableClock());
-		ModifyableClock.setCurrentTime(0);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(PARALLELISM);
-
-
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
-				NUM_ELEMENTS))
-				.broadcast();
-
-		// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
-		// fire the latch
-		DataStream<String> mapped = source
-				.flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
-					private static final long serialVersionUID = 1L;
-
-					int count = 0;
-					@Override
-					public void flatMap(Tuple2<Integer, String> value,
-							Collector<String> out) throws Exception {
-						out.collect(value.f1);
-						count++;
-						if (count >= 5) {
-							if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-								latch1.trigger();
-							} else {
-								latch2.trigger();
-							}
-							count = 0;
-						}
-					}
-
-				});
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new DateTimeBucketer("ss"))
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		mapped.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
-
-		// we should have 8 rolling files, 4 time intervals and parallelism of 2
-		int numFiles = 0;
-		while (files.hasNext()) {
-			LocatedFileStatus file = files.next();
-			numFiles++;
-			if (file.getPath().toString().contains("rolling-out/00")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 0; i < 5; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else if (file.getPath().toString().contains("rolling-out/05")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 5; i < 10; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else if (file.getPath().toString().contains("rolling-out/10")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 10; i < 15; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else if (file.getPath().toString().contains("rolling-out/15")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 15; i < 20; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else {
-				Assert.fail("File " + file + " does not match any expected roll pattern.");
-			}
-		}
-
-		Assert.assertEquals(8, numFiles);
-	}
-
-
-	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		private final int numElements;
-
-		public TestSourceFunction(int numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < numElements && running; i++) {
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	/**
-	 * This waits on the two multi-shot latches. The latches are triggered in a parallel
-	 * flatMap inside the test topology.
-	 */
-	private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		private final int numElements;
-
-		public WaitingTestSourceFunction(int numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < numElements && running; i++) {
-				if (i % 5 == 0 && i > 0) {
-					// update the clock after "five seconds", so we get 20 seconds in total
-					// with 5 elements in each time window
-					latch1.await();
-					latch2.await();
-					ModifyableClock.setCurrentTime(i * 1000);
-				}
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	public static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Tuple2<Integer, String> value) throws Exception {
-			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-				return value.f0 % 2 == 0;
-			} else {
-				return value.f0 % 2 == 1;
-			}
-		}
-	}
-
-	public static class ModifyableClock implements Clock {
-
-		private static volatile long currentTime = 0;
-
-		public static void setCurrentTime(long currentTime) {
-			ModifyableClock.currentTime = currentTime;
-		}
-
-		@Override
-		public long currentTimeMillis() {
-			return currentTime;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
deleted file mode 100644
index fe60d94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
deleted file mode 100644
index b4f5ad2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
+++ /dev/null
@@ -1,174 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-flume</artifactId>
-	<name>flink-connector-flume</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<flume-ng.version>1.5.0</flume-ng.version>
-	</properties>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flume</groupId>
-			<artifactId>flume-ng-core</artifactId>
-			<version>${flume-ng.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-io</groupId>
-					<artifactId>commons-io</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-codec</groupId>
-					<artifactId>commons-codec</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-cli</groupId>
-					<artifactId>commons-cli</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-lang</groupId>
-					<artifactId>commons-lang</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.avro</groupId>
-					<artifactId>avro</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-core-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-mapper-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.thoughtworks.paranamer</groupId>
-					<artifactId>paranamer</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.tukaani</groupId>
-					<artifactId>xz</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.velocity</groupId>
-					<artifactId>velocity</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-collections</groupId>
-					<artifactId>commons-collections</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>servlet-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.google.code.gson</groupId>
-					<artifactId>gson</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.thrift</groupId>
-					<artifactId>libthrift</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<configuration>
-							<artifactSet>
-								<includes combine.children="append">
-									<!-- We include all dependencies that transitively depend on guava -->
-									<include>org.apache.flume:*</include>
-								</includes>
-							</artifactSet>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-							</transformers>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 50f5770..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
-
-	private transient FlinkRpcClientFacade client;
-	boolean initDone = false;
-	String host;
-	int port;
-	SerializationSchema<IN, byte[]> schema;
-
-	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
-		this.host = host;
-		this.port = port;
-		this.schema = schema;
-	}
-
-	/**
-	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
-	 * them to Apache Flume.
-	 * 
-	 * @param value
-	 *            The tuple arriving from the datastream
-	 */
-	@Override
-	public void invoke(IN value) {
-
-		byte[] data = schema.serialize(value);
-		client.sendDataToFlume(data);
-
-	}
-
-	private class FlinkRpcClientFacade {
-		private RpcClient client;
-		private String hostname;
-		private int port;
-
-		/**
-		 * Initializes the connection to Apache Flume.
-		 * 
-		 * @param hostname
-		 *            The host
-		 * @param port
-		 *            The port.
-		 */
-		public void init(String hostname, int port) {
-			// Setup the RPC connection
-			this.hostname = hostname;
-			this.port = port;
-			int initCounter = 0;
-			while (true) {
-				if (initCounter >= 90) {
-					throw new RuntimeException("Cannot establish connection with" + port + " at "
-							+ host);
-				}
-				try {
-					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
-				} catch (FlumeException e) {
-					// Wait one second if the connection failed before the next
-					// try
-					try {
-						Thread.sleep(1000);
-					} catch (InterruptedException e1) {
-						if (LOG.isErrorEnabled()) {
-							LOG.error("Interrupted while trying to connect {} at {}", port, host);
-						}
-					}
-				}
-				if (client != null) {
-					break;
-				}
-				initCounter++;
-			}
-			initDone = true;
-		}
-
-		/**
-		 * Sends byte arrays as {@link Event} series to Apache Flume.
-		 * 
-		 * @param data
-		 *            The byte array to send to Apache FLume
-		 */
-		public void sendDataToFlume(byte[] data) {
-			Event event = EventBuilder.withBody(data);
-
-			try {
-				client.append(event);
-
-			} catch (EventDeliveryException e) {
-				// clean up and recreate the client
-				client.close();
-				client = null;
-				client = RpcClientFactory.getDefaultInstance(hostname, port);
-			}
-		}
-
-	}
-
-	@Override
-	public void close() {
-		client.client.close();
-	}
-
-	@Override
-	public void open(Configuration config) {
-		client = new FlinkRpcClientFacade();
-		client.init(host, port);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
deleted file mode 100644
index 8fecd0f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ /dev/null
@@ -1,149 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import java.util.List;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.functions.source.ConnectorSource;
-//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-//import org.apache.flink.util.Collector;
-//import org.apache.flume.Context;
-//import org.apache.flume.channel.ChannelProcessor;
-//import org.apache.flume.source.AvroSource;
-//import org.apache.flume.source.avro.AvroFlumeEvent;
-//import org.apache.flume.source.avro.Status;
-//
-//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
-//	private static final long serialVersionUID = 1L;
-//
-//	String host;
-//	String port;
-//	volatile boolean finished = false;
-//
-//	private volatile boolean isRunning = false;
-//
-//	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
-//		super(deserializationSchema);
-//		this.host = host;
-//		this.port = Integer.toString(port);
-//	}
-//
-//	public class MyAvroSource extends AvroSource {
-//		Collector<OUT> output;
-//
-//		/**
-//		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
-//		 * {@link DataStream}.
-//		 *
-//		 * @param avroEvent
-//		 *            The event that should be sent to the dataStream
-//		 * @return A {@link Status}.OK message if sending the event was
-//		 *         successful.
-//		 */
-//		@Override
-//		public Status append(AvroFlumeEvent avroEvent) {
-//			collect(avroEvent);
-//			return Status.OK;
-//		}
-//
-//		/**
-//		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
-//		 * {@link DataStream}.
-//		 *
-//		 * @param events
-//		 *            The events that is sent to the dataStream
-//		 * @return A Status.OK message if sending the events was successful.
-//		 */
-//		@Override
-//		public Status appendBatch(List<AvroFlumeEvent> events) {
-//			for (AvroFlumeEvent avroEvent : events) {
-//				collect(avroEvent);
-//			}
-//
-//			return Status.OK;
-//		}
-//
-//		/**
-//		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
-//		 * {@link DataStream}.
-//		 *
-//		 * @param avroEvent
-//		 *            The event that is sent to the dataStream
-//		 */
-//		private void collect(AvroFlumeEvent avroEvent) {
-//			byte[] b = avroEvent.getBody().array();
-//			OUT out = FlumeSource.this.schema.deserialize(b);
-//
-//			if (schema.isEndOfStream(out)) {
-//				FlumeSource.this.finished = true;
-//				this.stop();
-//				FlumeSource.this.notifyAll();
-//			} else {
-//				output.collect(out);
-//			}
-//
-//		}
-//
-//	}
-//
-//	MyAvroSource avroSource;
-//
-//	/**
-//	 * Configures the AvroSource. Also sets the output so the application can
-//	 * use it from outside of the invoke function.
-//	 *
-//	 * @param output
-//	 *            The output used in the invoke function
-//	 */
-//	public void configureAvroSource(Collector<OUT> output) {
-//
-//		avroSource = new MyAvroSource();
-//		avroSource.output = output;
-//		Context context = new Context();
-//		context.put("port", port);
-//		context.put("bind", host);
-//		avroSource.configure(context);
-//		// An instance of a ChannelProcessor is required for configuring the
-//		// avroSource although it will not be used in this case.
-//		ChannelProcessor cp = new ChannelProcessor(null);
-//		avroSource.setChannelProcessor(cp);
-//	}
-//
-//	/**
-//	 * Configures the AvroSource and runs until the user calls a close function.
-//	 *
-//	 * @param output
-//	 *            The Collector for sending data to the datastream
-//	 */
-//	@Override
-//	public void run(Collector<OUT> output) throws Exception {
-//		isRunning = true;
-//		configureAvroSource(output);
-//		avroSource.start();
-//		while (!finished && isRunning) {
-//			this.wait();
-//		}
-//	}
-//
-//	@Override
-//	public void cancel() {
-//		isRunning = false;
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
deleted file mode 100644
index 45da6eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ /dev/null
@@ -1,49 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.util.serialization.SerializationSchema;
-//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-//
-//public class FlumeTopology {
-//
-//	public static void main(String[] args) throws Exception {
-//
-//		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-//
-//		@SuppressWarnings("unused")
-//		DataStream<String> inputStream1 = env.addSource(
-//				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
-//				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
-//
-//		env.execute();
-//	}
-//
-//	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
-//
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public byte[] serialize(String element) {
-//			return element.getBytes();
-//		}
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
deleted file mode 100644
index f098d9c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-kafka</artifactId>
-	<name>flink-connector-kafka</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<kafka.version>0.8.2.0</kafka.version>
-	</properties>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_${scala.binary.version}</artifactId>
-			<version>${kafka.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>com.sun.jmx</groupId>
-					<artifactId>jmxri</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jdmk</groupId>
-					<artifactId>jmxtools</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-simple</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>net.sf.jopt-simple</groupId>
-					<artifactId>jopt-simple</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.scala-lang</groupId>
-					<artifactId>scala-reflect</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.scala-lang</groupId>
-					<artifactId>scala-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.yammer.metrics</groupId>
-					<artifactId>metrics-annotation</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>${curator.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-failsafe-plugin</artifactId>
-				<configuration>
-					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
-					<forkCount>1</forkCount>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-	
-</project>