You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/29 15:46:05 UTC

[1/6] flink git commit: [FLINK-2112] [streaming] Proper package for kafka tests

Repository: flink
Updated Branches:
  refs/heads/master 84fce54b6 -> 0930179f4


[FLINK-2112] [streaming] Proper package for kafka tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0fd60233
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0fd60233
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0fd60233

Branch: refs/heads/master
Commit: 0fd60233bbdf55139de8bc36c16ff8fa33bdc995
Parents: 145b2ba
Author: mbalassi <mb...@apache.org>
Authored: Fri May 29 14:17:27 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaITCase.java | 1023 ++++++++++++++++++
 .../kafka/util/KafkaLocalSystemTime.java        |   48 +
 .../flink/streaming/kafka/KafkaITCase.java      | 1023 ------------------
 .../kafka/util/KafkaLocalSystemTime.java        |   48 -
 4 files changed, 1071 insertions(+), 1071 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
new file mode 100644
index 0000000..7b7bdcc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -0,0 +1,1023 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import kafka.admin.AdminUtils;
+import kafka.api.PartitionMetadata;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.network.SocketServer;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import scala.collection.Seq;
+
+/**
+ * Code in this test is based on the following GitHub repository:
+ * (as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef)
+ * <p/>
+ * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
+ */
+
+public class KafkaITCase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
+	private static final int NUMBER_OF_KAFKA_SERVERS = 3;
+
+	private static int zkPort;
+	private static String kafkaHost;
+
+	private static String zookeeperConnectionString;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	public static File tmpZkDir;
+	public static List<File> tmpKafkaDirs;
+
+	private static TestingServer zookeeper;
+	private static List<KafkaServer> brokers;
+	private static String brokerConnectionStrings = "";
+
+	private static ConsumerConfig standardCC;
+
+	private static ZkClient zkClient;
+
+
+	@BeforeClass
+	public static void prepare() throws IOException {
+		LOG.info("Starting KafkaITCase.prepare()");
+		tmpZkDir = tempFolder.newFolder();
+
+		tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
+		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+			tmpKafkaDirs.add(tempFolder.newFolder());
+		}
+
+		kafkaHost = InetAddress.getLocalHost().getHostName();
+		zkPort = NetUtils.getAvailablePort();
+		zookeeperConnectionString = "localhost:" + zkPort;
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			LOG.info("Starting Zookeeper");
+			zookeeper = getZookeeper();
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
+			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+				SocketServer socketServer = brokers.get(i).socketServer();
+				String host = "localhost";
+				if(socketServer.host() != null) {
+					host = socketServer.host();
+				}
+				brokerConnectionStrings += host+":"+socketServer.port()+",";
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		} catch (Throwable t) {
+			LOG.warn("Test failed with exception", t);
+			Assert.fail("Test failed with: " + t.getMessage());
+		}
+
+		Properties cProps = new Properties();
+		cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		cProps.setProperty("group.id", "flink-tests");
+		cProps.setProperty("auto.commit.enable", "false");
+
+		cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
+
+		standardCC = new ConsumerConfig(cProps);
+
+		zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+	}
+
+	@AfterClass
+	public static void shutDownServices() {
+		LOG.info("Shutting down all services");
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			} catch (IOException e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+		}
+		zkClient.close();
+	}
+
+
+	@Test
+	public void testOffsetManipulation() {
+		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+		final String topicName = "testOffsetManipulation";
+
+		// create topic
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topicName);
+		AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
+
+		Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+
+		zk.close();
+	}
+	/**
+	 * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
+	 *
+	 */
+	@Test
+	public void testPersistentSourceWithOffsetUpdates() throws Exception {
+		LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
+
+		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
+
+		final String topicName = "testOffsetHacking";
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
+		env.getConfig().disableSysoutLogging();
+		env.enableCheckpointing(50);
+		env.setNumberOfExecutionRetries(0);
+
+		// create topic
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topicName);
+		AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
+
+		// write a sequence from 0 to 99 to each of the three partitions.
+		writeSequence(env, topicName, 0, 99);
+
+
+		readSequence(env, standardCC, topicName, 0, 100, 300);
+
+		// check offsets to be set at least higher than 50.
+		// correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending
+		// checkpoints have been committed.
+		// To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep().
+		long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
+		long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
+		long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
+		Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L);
+		Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L);
+		Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L);
+		/** Once we have proper shutdown of streaming jobs, enable these tests
+		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
+		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
+		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/
+
+
+		LOG.info("Manipulating offsets");
+		// set the offset to 25, 50, and 75 for the three partitions
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
+		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
+
+		// create new env
+		env = StreamExecutionEnvironment.createLocalEnvironment(3);
+		env.getConfig().disableSysoutLogging();
+		readSequence(env, standardCC, topicName, 50, 50, 150);
+
+		zk.close();
+
+		LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
+	}
+
+	private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
+		LOG.info("Reading sequence for verification until final count {}", finalCount);
+		DataStream<Tuple2<Integer, Integer>> source = env.addSource(
+				new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
+		)
+		//add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
+		// to play this trick. The problem is that we have to wait until all checkpoints are confirmed
+		.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
+			@Override
+			public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+				Thread.sleep(150);
+				return value;
+			}
+		}).setParallelism(3);
+
+		// verify data
+		DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
+			int[] values = new int[valuesCount];
+			int count = 0;
+
+			@Override
+			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
+				values[value.f1 - valuesStartFrom]++;
+				count++;
+
+				LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
+				// verify if we've seen everything
+
+				if (count == finalCount) {
+					LOG.info("Received all values");
+					for (int i = 0; i < values.length; i++) {
+						int v = values[i];
+						if (v != 3) {
+							LOG.warn("Test is going to fail");
+							printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
+							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
+						}
+					}
+					// test has passed
+					throw new SuccessException();
+				}
+			}
+
+		}).setParallelism(1);
+
+		tryExecute(env, "Read data from Kafka");
+
+		LOG.info("Successfully read sequence for verification");
+	}
+
+
+
+	private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
+		LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
+
+		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
+			private static final long serialVersionUID = 1L;
+			int cnt = from;
+			int partition;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				partition = getRuntimeContext().getIndexOfThisSubtask();
+
+			}
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return cnt > to;
+			}
+
+			@Override
+			public Tuple2<Integer, Integer> next() throws Exception {
+				LOG.info("Writing " + cnt + " to partition " + partition);
+				Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt);
+				cnt++;
+				return result;
+			}
+		}).setParallelism(3);
+
+		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
+				topicName,
+				new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
+				new T2Partitioner()
+		)).setParallelism(3);
+		env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
+		LOG.info("Finished writing sequence");
+	}
+
+	private static class T2Partitioner implements SerializableKafkaPartitioner {
+		@Override
+		public int partition(Object key, int numPartitions) {
+			if(numPartitions != 3) {
+				throw new IllegalArgumentException("Expected three partitions");
+			}
+			Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
+			return element.f0;
+		}
+	}
+
+
+	@Test
+	public void regularKafkaSourceTest() throws Exception {
+		LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
+
+		String topic = "regularKafkaSourceTestTopic";
+		createTestTopic(topic, 1, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.debug("Got value = " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+			private static final long serialVersionUID = 1L;
+			int cnt = 0;
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return false;
+			}
+
+			@Override
+			public Tuple2<Long, String> next() throws Exception {
+				Thread.sleep(100);
+				return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
+
+		tryExecute(env, "regular kafka source test");
+
+		LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
+	}
+
+	@Test
+	public void tupleTestTopology() throws Exception {
+		LOG.info("Starting KafkaITCase.tupleTestTopology()");
+
+		String topic = "tupleTestTopic";
+		createTestTopic(topic, 1, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+						new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+						standardCC
+				));
+		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.info("Got value " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+
+			@Override
+			public void close() throws Exception {
+				super.close();
+				Assert.assertTrue("No element received", elCnt > 0);
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+			private static final long serialVersionUID = 1L;
+			int cnt = 0;
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return false;
+			}
+
+			@Override
+			public Tuple2<Long, String> next() throws Exception {
+				Thread.sleep(100);
+				return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
+
+		tryExecute(env, "tupletesttopology");
+
+		LOG.info("Finished KafkaITCase.tupleTestTopology()");
+	}
+
+	/**
+	 * Test Flink's Kafka integration also with very big records (30MB)
+	 *
+	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void bigRecordTestTopology() throws Exception {
+
+		LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
+
+		String topic = "bigRecordTestTopic";
+		createTestTopic(topic, 1, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
+		Properties consumerProps = new Properties();
+		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
+		consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		consumerProps.setProperty("group.id", "test");
+		consumerProps.setProperty("auto.commit.enable", "false");
+		consumerProps.setProperty("auto.offset.reset", "smallest");
+
+		ConsumerConfig cc = new ConsumerConfig(consumerProps);
+		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
+
+		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+			int elCnt = 0;
+
+			@Override
+			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+				LOG.info("Received {}", value.f0);
+				elCnt++;
+				if(value.f0 == -1) {
+					// we should have seen 11 elements now.
+					if(elCnt == 11) {
+						throw new SuccessException();
+					} else {
+						throw new RuntimeException("There have been "+elCnt+" elements");
+					}
+				}
+				if(elCnt > 10) {
+					throw new RuntimeException("More than 10 elements seen: "+elCnt);
+				}
+			}
+		}).setParallelism(1);
+
+		// add producing topology
+		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+			long cnt;
+			transient Random rnd;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				cnt = 0;
+				rnd = new Random(1337);
+
+			}
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return cnt > 10;
+			}
+
+			@Override
+			public Tuple2<Long, byte[]> next() throws Exception {
+				Thread.sleep(100);
+
+				if (cnt < 10) {
+				byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
+					Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(cnt++, wl);
+					LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
+					return result;
+
+				} else if (cnt == 10) {
+					Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(-1L, new byte[]{1});
+					cnt++;
+					return result;
+				} else {
+					throw new RuntimeException("Source is exhausted.");
+				}
+			}
+		});
+
+		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
+				new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
+		);
+
+		tryExecute(env, "big topology test");
+
+		LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
+	}
+
+
+	@Test
+	public void customPartitioningTestTopology() throws Exception {
+		LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
+
+		String topic = "customPartitioningTestTopic";
+
+		createTestTopic(topic, 3, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
+				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
+						new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
+						standardCC));
+		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			boolean gotPartition1 = false;
+			boolean gotPartition2 = false;
+			boolean gotPartition3 = false;
+
+			@Override
+			public void invoke(Tuple2<Long, String> value) throws Exception {
+				LOG.debug("Got " + value);
+				String[] sp = value.f1.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				assertEquals(value.f0 - 1000, (long) v);
+
+				switch (v) {
+					case 9:
+						gotPartition1 = true;
+						break;
+					case 19:
+						gotPartition2 = true;
+						break;
+					case 99:
+						gotPartition3 = true;
+						break;
+				}
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+
+				if (gotPartition1 && gotPartition2 && gotPartition3) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
+			private static final long serialVersionUID = 1L;
+			int cnt = 0;
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return false;
+			}
+
+			@Override
+			public Tuple2<Long, String> next() throws Exception {
+				Thread.sleep(100);
+				return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
+			}
+		});
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
+
+		tryExecute(env, "custom partitioning test");
+
+		LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
+	}
+
+	/**
+	 * This is for a topic with 3 partitions and Tuple2<Long, String>
+	 */
+	private static class CustomPartitioner implements SerializableKafkaPartitioner {
+
+		@Override
+		public int partition(Object key, int numPartitions) {
+
+			@SuppressWarnings("unchecked")
+			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
+			if (tuple.f0 < 10) {
+				return 0;
+			} else if (tuple.f0 < 20) {
+				return 1;
+			} else {
+				return 2;
+			}
+		}
+	}
+
+
+	@Test
+	public void simpleTestTopology() throws Exception {
+		String topic = "simpleTestTopic";
+
+		createTestTopic(topic, 1, 1);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<String> consuming = env.addSource(
+				new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+		consuming.addSink(new SinkFunction<String>() {
+			int elCnt = 0;
+			int start = -1;
+			BitSet validator = new BitSet(101);
+
+			@Override
+			public void invoke(String value) throws Exception {
+				LOG.debug("Got " + value);
+				String[] sp = value.split("-");
+				int v = Integer.parseInt(sp[1]);
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 100) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != 100) {
+						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+			int cnt = 0;
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return false;
+			}
+
+			@Override
+			public String next() throws Exception {
+				Thread.sleep(100);
+				return "kafka-" + cnt++;
+			}
+		});
+		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
+
+		tryExecute(env, "simpletest");
+	}
+
+	private static boolean leaderHasShutDown = false;
+	private static boolean shutdownKafkaBroker;
+
+	@Test(timeout=60000)
+	public void brokerFailureTest() throws Exception {
+		String topic = "brokerFailureTestTopic";
+
+		createTestTopic(topic, 2, 2);
+
+		// --------------------------- write data to topic ---------------------
+		LOG.info("Writing data to topic {}", topic);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+
+			private int cnt = 0;
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return cnt == 200;
+			}
+
+			@Override
+			public String next() throws Exception {
+				String msg = "kafka-" + cnt++;
+				LOG.info("sending message = "+msg);
+
+				if ((cnt - 1) % 20 == 0) {
+					LOG.debug("Sending message #{}", cnt - 1);
+				}
+
+				return msg;
+			}
+
+		});
+		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
+				.setParallelism(1);
+
+		tryExecute(env, "broker failure test - writer");
+
+		// --------------------------- read and let broker fail ---------------------
+
+		LOG.info("Reading data from topic {} and let a broker fail", topic);
+		PartitionMetadata firstPart = null;
+		do {
+			if(firstPart != null) {
+				LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+				// not the first try. Sleep a bit
+				Thread.sleep(150);
+			}
+			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+			firstPart = partitionMetadata.head();
+		} while(firstPart.errorCode() != 0);
+
+		final String leaderToShutDown = firstPart.leader().get().connectionString();
+		LOG.info("Leader to shutdown {}", leaderToShutDown);
+
+		final Thread brokerShutdown = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				shutdownKafkaBroker = false;
+				while (!shutdownKafkaBroker) {
+					try {
+						Thread.sleep(10);
+					} catch (InterruptedException e) {
+						LOG.warn("Interruption", e);
+					}
+				}
+
+				for (KafkaServer kafkaServer : brokers) {
+					if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+						LOG.info("Killing Kafka Server {}", leaderToShutDown);
+						kafkaServer.shutdown();
+						leaderHasShutDown = true;
+						break;
+					}
+				}
+			}
+		});
+		brokerShutdown.start();
+
+		// add consuming topology:
+		DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
+		consuming.setParallelism(1);
+
+		consuming.addSink(new SinkFunction<String>() {
+			int elCnt = 0;
+			int start = 0;
+			int numOfMessagesToBeCorrect = 100;
+			int stopAfterMessages = 150;
+
+			BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
+
+			@Override
+			public void invoke(String value) throws Exception {
+				LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
+				String[] sp = value.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				if (start == -1) {
+					start = v;
+				}
+				int offset = v - start;
+				Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
+				if (v - start < 0 && LOG.isWarnEnabled()) {
+					LOG.warn("Not in order: {}", value);
+				}
+
+				validator.set(offset);
+				elCnt++;
+				if (elCnt == 20) {
+					LOG.info("Asking leading broker to shut down");
+					// shut down a Kafka broker
+					shutdownKafkaBroker = true;
+				}
+				if (shutdownKafkaBroker) {
+					// we become a bit slower because the shutdown takes some time and we have
+					// only a fixed nubmer of elements to read
+					Thread.sleep(20);
+				}
+				if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+					if (elCnt >= stopAfterMessages) {
+						// check if everything in the bitset is set to true
+						int nc;
+						if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
+							throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
+						}
+						throw new SuccessException();
+					}
+				}
+			}
+		});
+		tryExecute(env, "broker failure test - reader");
+
+	}
+
+	public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
+		try {
+			see.execute(name);
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				if(t == null) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
+				}
+
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
+				}
+			}
+		}
+	}
+
+	private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+		// create topic
+		Properties topicConfig = new Properties();
+		LOG.info("Creating topic {}", topic);
+		AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
+	}
+
+	private static TestingServer getZookeeper() throws Exception {
+		return new TestingServer(zkPort, tmpZkDir);
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 */
+	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
+		Properties kafkaProperties = new Properties();
+
+		int kafkaPort = NetUtils.getAvailablePort();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", kafkaHost);
+		kafkaProperties.put("port", Integer.toString(kafkaPort));
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
+		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+		server.startup();
+		return server;
+	}
+
+	public static class SuccessException extends Exception {
+		private static final long serialVersionUID = 1L;
+	}
+
+
+	// ----------------------- Debugging utilities --------------------
+
+	/**
+	 * Read topic to list, only using Kafka code.
+	 * @return
+	 */
+	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
+		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
+		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
+		// will see each message only once.
+		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
+		if(streams.size() != 1) {
+			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+		}
+		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+		if(kafkaStreams == null) {
+			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+		}
+		if(kafkaStreams.size() != 1) {
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+		}
+		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
+		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
+
+		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+		int read = 0;
+		while(iteratorToRead.hasNext()) {
+			read++;
+			result.add(iteratorToRead.next());
+			if(read == stopAfter) {
+				LOG.info("Read "+read+" elements");
+				return result;
+			}
+		}
+		return result;
+	}
+
+	private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){
+		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
+		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
+		for(MessageAndMetadata<byte[], byte[]> message: contents) {
+			Object out = deserializationSchema.deserialize(message.message());
+			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
+		}
+	}
+
+	private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
+		// write the sequence to log for debugging purposes
+		Properties stdProps = standardCC.props().props();
+		Properties newProps = new Properties(stdProps);
+		newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
+		newProps.setProperty("auto.offset.reset", "smallest");
+		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
+
+		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
+		DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
+		printTopic(topicName, printerConfig, deserializer, elements);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..18fa46f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.utils.Time;
+
+public class KafkaLocalSystemTime implements Time {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+	@Override
+	public long milliseconds() {
+		return System.currentTimeMillis();
+	}
+
+	public long nanoseconds() {
+		return System.nanoTime();
+	}
+
+	@Override
+	public void sleep(long ms) {
+		try {
+			Thread.sleep(ms);
+		} catch (InterruptedException e) {
+			LOG.warn("Interruption", e);
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
deleted file mode 100644
index 7b7bdcc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/KafkaITCase.java
+++ /dev/null
@@ -1,1023 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-
-import kafka.admin.AdminUtils;
-import kafka.api.PartitionMetadata;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.network.SocketServer;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import scala.collection.Seq;
-
-/**
- * Code in this test is based on the following GitHub repository:
- * (as per commit bc6b2b2d5f6424d5f377aa6c0871e82a956462ef)
- * <p/>
- * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
- */
-
-public class KafkaITCase {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
-	private static final int NUMBER_OF_KAFKA_SERVERS = 3;
-
-	private static int zkPort;
-	private static String kafkaHost;
-
-	private static String zookeeperConnectionString;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-	public static File tmpZkDir;
-	public static List<File> tmpKafkaDirs;
-
-	private static TestingServer zookeeper;
-	private static List<KafkaServer> brokers;
-	private static String brokerConnectionStrings = "";
-
-	private static ConsumerConfig standardCC;
-
-	private static ZkClient zkClient;
-
-
-	@BeforeClass
-	public static void prepare() throws IOException {
-		LOG.info("Starting KafkaITCase.prepare()");
-		tmpZkDir = tempFolder.newFolder();
-
-		tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
-		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-			tmpKafkaDirs.add(tempFolder.newFolder());
-		}
-
-		kafkaHost = InetAddress.getLocalHost().getHostName();
-		zkPort = NetUtils.getAvailablePort();
-		zookeeperConnectionString = "localhost:" + zkPort;
-
-		zookeeper = null;
-		brokers = null;
-
-		try {
-			LOG.info("Starting Zookeeper");
-			zookeeper = getZookeeper();
-			LOG.info("Starting KafkaServer");
-			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
-			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
-				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
-				SocketServer socketServer = brokers.get(i).socketServer();
-				String host = "localhost";
-				if(socketServer.host() != null) {
-					host = socketServer.host();
-				}
-				brokerConnectionStrings += host+":"+socketServer.port()+",";
-			}
-
-			LOG.info("ZK and KafkaServer started.");
-		} catch (Throwable t) {
-			LOG.warn("Test failed with exception", t);
-			Assert.fail("Test failed with: " + t.getMessage());
-		}
-
-		Properties cProps = new Properties();
-		cProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		cProps.setProperty("group.id", "flink-tests");
-		cProps.setProperty("auto.commit.enable", "false");
-
-		cProps.setProperty("auto.offset.reset", "smallest"); // read from the beginning.
-
-		standardCC = new ConsumerConfig(cProps);
-
-		zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-	}
-
-	@AfterClass
-	public static void shutDownServices() {
-		LOG.info("Shutting down all services");
-		for (KafkaServer broker : brokers) {
-			if (broker != null) {
-				broker.shutdown();
-			}
-		}
-		if (zookeeper != null) {
-			try {
-				zookeeper.stop();
-			} catch (IOException e) {
-				LOG.warn("ZK.stop() failed", e);
-			}
-		}
-		zkClient.close();
-	}
-
-
-	@Test
-	public void testOffsetManipulation() {
-		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-
-		final String topicName = "testOffsetManipulation";
-
-		// create topic
-		Properties topicConfig = new Properties();
-		LOG.info("Creating topic {}", topicName);
-		AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
-
-		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 1337);
-
-		Assert.assertEquals(1337L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
-
-		zk.close();
-	}
-	/**
-	 * We want to use the High level java consumer API but manage the offset in Zookeeper manually.
-	 *
-	 */
-	@Test
-	public void testPersistentSourceWithOffsetUpdates() throws Exception {
-		LOG.info("Starting testPersistentSourceWithOffsetUpdates()");
-
-		ZkClient zk = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(), standardCC.zkConnectionTimeoutMs(), new PersistentKafkaSource.KafkaZKStringSerializer());
-
-		final String topicName = "testOffsetHacking";
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
-		env.getConfig().disableSysoutLogging();
-		env.enableCheckpointing(50);
-		env.setNumberOfExecutionRetries(0);
-
-		// create topic
-		Properties topicConfig = new Properties();
-		LOG.info("Creating topic {}", topicName);
-		AdminUtils.createTopic(zk, topicName, 3, 2, topicConfig);
-
-		// write a sequence from 0 to 99 to each of the three partitions.
-		writeSequence(env, topicName, 0, 99);
-
-
-		readSequence(env, standardCC, topicName, 0, 100, 300);
-
-		// check offsets to be set at least higher than 50.
-		// correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending
-		// checkpoints have been committed.
-		// To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep().
-		long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
-		long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
-		long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
-		Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L);
-		Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L);
-		Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L);
-		/** Once we have proper shutdown of streaming jobs, enable these tests
-		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0));
-		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1));
-		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/
-
-
-		LOG.info("Manipulating offsets");
-		// set the offset to 25, 50, and 75 for the three partitions
-		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50);
-		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50);
-		PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50);
-
-		// create new env
-		env = StreamExecutionEnvironment.createLocalEnvironment(3);
-		env.getConfig().disableSysoutLogging();
-		readSequence(env, standardCC, topicName, 50, 50, 150);
-
-		zk.close();
-
-		LOG.info("Finished testPersistentSourceWithOffsetUpdates()");
-	}
-
-	private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception {
-		LOG.info("Reading sequence for verification until final count {}", finalCount);
-		DataStream<Tuple2<Integer, Integer>> source = env.addSource(
-				new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc)
-		)
-		//add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have
-		// to play this trick. The problem is that we have to wait until all checkpoints are confirmed
-		.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
-			@Override
-			public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
-				Thread.sleep(150);
-				return value;
-			}
-		}).setParallelism(3);
-
-		// verify data
-		DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() {
-			int[] values = new int[valuesCount];
-			int count = 0;
-
-			@Override
-			public void flatMap(Tuple2<Integer, Integer> value, Collector<Integer> out) throws Exception {
-				values[value.f1 - valuesStartFrom]++;
-				count++;
-
-				LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount);
-				// verify if we've seen everything
-
-				if (count == finalCount) {
-					LOG.info("Received all values");
-					for (int i = 0; i < values.length; i++) {
-						int v = values[i];
-						if (v != 3) {
-							LOG.warn("Test is going to fail");
-							printTopic(topicName, valuesCount, this.getRuntimeContext().getExecutionConfig());
-							throw new RuntimeException("Expected v to be 3, but was " + v + " on element " + i + " array=" + Arrays.toString(values));
-						}
-					}
-					// test has passed
-					throw new SuccessException();
-				}
-			}
-
-		}).setParallelism(1);
-
-		tryExecute(env, "Read data from Kafka");
-
-		LOG.info("Successfully read sequence for verification");
-	}
-
-
-
-	private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from, final int to) throws Exception {
-		LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
-
-		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-			private static final long serialVersionUID = 1L;
-			int cnt = from;
-			int partition;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				partition = getRuntimeContext().getIndexOfThisSubtask();
-
-			}
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return cnt > to;
-			}
-
-			@Override
-			public Tuple2<Integer, Integer> next() throws Exception {
-				LOG.info("Writing " + cnt + " to partition " + partition);
-				Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), cnt);
-				cnt++;
-				return result;
-			}
-		}).setParallelism(3);
-
-		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
-				topicName,
-				new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()),
-				new T2Partitioner()
-		)).setParallelism(3);
-		env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
-		LOG.info("Finished writing sequence");
-	}
-
-	private static class T2Partitioner implements SerializableKafkaPartitioner {
-		@Override
-		public int partition(Object key, int numPartitions) {
-			if(numPartitions != 3) {
-				throw new IllegalArgumentException("Expected three partitions");
-			}
-			Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
-			return element.f0;
-		}
-	}
-
-
-	@Test
-	public void regularKafkaSourceTest() throws Exception {
-		LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
-
-		String topic = "regularKafkaSourceTestTopic";
-		createTestTopic(topic, 1, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		// add consuming topology:
-		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup", new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), 5000));
-		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
-			int elCnt = 0;
-			int start = -1;
-			BitSet validator = new BitSet(101);
-
-			@Override
-			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.debug("Got value = " + value);
-				String[] sp = value.f1.split("-");
-				int v = Integer.parseInt(sp[1]);
-
-				assertEquals(value.f0 - 1000, (long) v);
-
-				if (start == -1) {
-					start = v;
-				}
-				Assert.assertFalse("Received tuple twice", validator.get(v - start));
-				validator.set(v - start);
-				elCnt++;
-				if (elCnt == 100) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != 100) {
-						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
-					}
-					throw new SuccessException();
-				}
-			}
-		});
-
-		// add producing topology
-		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-			private static final long serialVersionUID = 1L;
-			int cnt = 0;
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return false;
-			}
-
-			@Override
-			public Tuple2<Long, String> next() throws Exception {
-				Thread.sleep(100);
-				return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
-			}
-		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
-
-		tryExecute(env, "regular kafka source test");
-
-		LOG.info("Finished KafkaITCase.regularKafkaSourceTest()");
-	}
-
-	@Test
-	public void tupleTestTopology() throws Exception {
-		LOG.info("Starting KafkaITCase.tupleTestTopology()");
-
-		String topic = "tupleTestTopic";
-		createTestTopic(topic, 1, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		// add consuming topology:
-		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
-						new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
-						standardCC
-				));
-		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
-			int elCnt = 0;
-			int start = -1;
-			BitSet validator = new BitSet(101);
-
-			@Override
-			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.info("Got value " + value);
-				String[] sp = value.f1.split("-");
-				int v = Integer.parseInt(sp[1]);
-
-				assertEquals(value.f0 - 1000, (long) v);
-
-				if (start == -1) {
-					start = v;
-				}
-				Assert.assertFalse("Received tuple twice", validator.get(v - start));
-				validator.set(v - start);
-				elCnt++;
-				if (elCnt == 100) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != 100) {
-						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
-					}
-					throw new SuccessException();
-				}
-			}
-
-			@Override
-			public void close() throws Exception {
-				super.close();
-				Assert.assertTrue("No element received", elCnt > 0);
-			}
-		});
-
-		// add producing topology
-		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-			private static final long serialVersionUID = 1L;
-			int cnt = 0;
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return false;
-			}
-
-			@Override
-			public Tuple2<Long, String> next() throws Exception {
-				Thread.sleep(100);
-				return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
-			}
-		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig())));
-
-		tryExecute(env, "tupletesttopology");
-
-		LOG.info("Finished KafkaITCase.tupleTestTopology()");
-	}
-
-	/**
-	 * Test Flink's Kafka integration also with very big records (30MB)
-	 *
-	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void bigRecordTestTopology() throws Exception {
-
-		LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
-
-		String topic = "bigRecordTestTopic";
-		createTestTopic(topic, 1, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		// add consuming topology:
-		Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
-		Properties consumerProps = new Properties();
-		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
-		consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
-		consumerProps.setProperty("group.id", "test");
-		consumerProps.setProperty("auto.commit.enable", "false");
-		consumerProps.setProperty("auto.offset.reset", "smallest");
-
-		ConsumerConfig cc = new ConsumerConfig(consumerProps);
-		DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, cc));
-
-		consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
-			int elCnt = 0;
-
-			@Override
-			public void invoke(Tuple2<Long, byte[]> value) throws Exception {
-				LOG.info("Received {}", value.f0);
-				elCnt++;
-				if(value.f0 == -1) {
-					// we should have seen 11 elements now.
-					if(elCnt == 11) {
-						throw new SuccessException();
-					} else {
-						throw new RuntimeException("There have been "+elCnt+" elements");
-					}
-				}
-				if(elCnt > 10) {
-					throw new RuntimeException("More than 10 elements seen: "+elCnt);
-				}
-			}
-		}).setParallelism(1);
-
-		// add producing topology
-		DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
-			private static final long serialVersionUID = 1L;
-			boolean running = true;
-			long cnt;
-			transient Random rnd;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				super.open(parameters);
-				cnt = 0;
-				rnd = new Random(1337);
-
-			}
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return cnt > 10;
-			}
-
-			@Override
-			public Tuple2<Long, byte[]> next() throws Exception {
-				Thread.sleep(100);
-
-				if (cnt < 10) {
-				byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
-					Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(cnt++, wl);
-					LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
-					return result;
-
-				} else if (cnt == 10) {
-					Tuple2<Long, byte[]> result = new Tuple2<Long, byte[]>(-1L, new byte[]{1});
-					cnt++;
-					return result;
-				} else {
-					throw new RuntimeException("Source is exhausted.");
-				}
-			}
-		});
-
-		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
-				new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
-		);
-
-		tryExecute(env, "big topology test");
-
-		LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
-	}
-
-
-	@Test
-	public void customPartitioningTestTopology() throws Exception {
-		LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
-
-		String topic = "customPartitioningTestTopic";
-
-		createTestTopic(topic, 3, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		// add consuming topology:
-		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
-						new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()),
-						standardCC));
-		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
-			int start = -1;
-			BitSet validator = new BitSet(101);
-
-			boolean gotPartition1 = false;
-			boolean gotPartition2 = false;
-			boolean gotPartition3 = false;
-
-			@Override
-			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.debug("Got " + value);
-				String[] sp = value.f1.split("-");
-				int v = Integer.parseInt(sp[1]);
-
-				assertEquals(value.f0 - 1000, (long) v);
-
-				switch (v) {
-					case 9:
-						gotPartition1 = true;
-						break;
-					case 19:
-						gotPartition2 = true;
-						break;
-					case 99:
-						gotPartition3 = true;
-						break;
-				}
-
-				if (start == -1) {
-					start = v;
-				}
-				Assert.assertFalse("Received tuple twice", validator.get(v - start));
-				validator.set(v - start);
-
-				if (gotPartition1 && gotPartition2 && gotPartition3) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != 100) {
-						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
-					}
-					throw new SuccessException();
-				}
-			}
-		});
-
-		// add producing topology
-		DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
-			private static final long serialVersionUID = 1L;
-			int cnt = 0;
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return false;
-			}
-
-			@Override
-			public Tuple2<Long, String> next() throws Exception {
-				Thread.sleep(100);
-				return new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++);
-			}
-		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, new Utils.TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long, String>(1L, ""), env.getConfig()), new CustomPartitioner()));
-
-		tryExecute(env, "custom partitioning test");
-
-		LOG.info("Finished KafkaITCase.customPartitioningTestTopology()");
-	}
-
-	/**
-	 * This is for a topic with 3 partitions and Tuple2<Long, String>
-	 */
-	private static class CustomPartitioner implements SerializableKafkaPartitioner {
-
-		@Override
-		public int partition(Object key, int numPartitions) {
-
-			@SuppressWarnings("unchecked")
-			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
-			if (tuple.f0 < 10) {
-				return 0;
-			} else if (tuple.f0 < 20) {
-				return 1;
-			} else {
-				return 2;
-			}
-		}
-	}
-
-
-	@Test
-	public void simpleTestTopology() throws Exception {
-		String topic = "simpleTestTopic";
-
-		createTestTopic(topic, 1, 1);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		// add consuming topology:
-		DataStreamSource<String> consuming = env.addSource(
-				new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
-		consuming.addSink(new SinkFunction<String>() {
-			int elCnt = 0;
-			int start = -1;
-			BitSet validator = new BitSet(101);
-
-			@Override
-			public void invoke(String value) throws Exception {
-				LOG.debug("Got " + value);
-				String[] sp = value.split("-");
-				int v = Integer.parseInt(sp[1]);
-				if (start == -1) {
-					start = v;
-				}
-				Assert.assertFalse("Received tuple twice", validator.get(v - start));
-				validator.set(v - start);
-				elCnt++;
-				if (elCnt == 100) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != 100) {
-						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
-					}
-					throw new SuccessException();
-				}
-			}
-		});
-
-		// add producing topology
-		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
-			private static final long serialVersionUID = 1L;
-			boolean running = true;
-			int cnt = 0;
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return false;
-			}
-
-			@Override
-			public String next() throws Exception {
-				Thread.sleep(100);
-				return "kafka-" + cnt++;
-			}
-		});
-		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()));
-
-		tryExecute(env, "simpletest");
-	}
-
-	private static boolean leaderHasShutDown = false;
-	private static boolean shutdownKafkaBroker;
-
-	@Test(timeout=60000)
-	public void brokerFailureTest() throws Exception {
-		String topic = "brokerFailureTestTopic";
-
-		createTestTopic(topic, 2, 2);
-
-		// --------------------------- write data to topic ---------------------
-		LOG.info("Writing data to topic {}", topic);
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
-
-			private int cnt = 0;
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return cnt == 200;
-			}
-
-			@Override
-			public String next() throws Exception {
-				String msg = "kafka-" + cnt++;
-				LOG.info("sending message = "+msg);
-
-				if ((cnt - 1) % 20 == 0) {
-					LOG.debug("Sending message #{}", cnt - 1);
-				}
-
-				return msg;
-			}
-
-		});
-		stream.addSink(new KafkaSink<String>(brokerConnectionStrings, topic, new JavaDefaultStringSchema()))
-				.setParallelism(1);
-
-		tryExecute(env, "broker failure test - writer");
-
-		// --------------------------- read and let broker fail ---------------------
-
-		LOG.info("Reading data from topic {} and let a broker fail", topic);
-		PartitionMetadata firstPart = null;
-		do {
-			if(firstPart != null) {
-				LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-				// not the first try. Sleep a bit
-				Thread.sleep(150);
-			}
-			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
-			firstPart = partitionMetadata.head();
-		} while(firstPart.errorCode() != 0);
-
-		final String leaderToShutDown = firstPart.leader().get().connectionString();
-		LOG.info("Leader to shutdown {}", leaderToShutDown);
-
-		final Thread brokerShutdown = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				shutdownKafkaBroker = false;
-				while (!shutdownKafkaBroker) {
-					try {
-						Thread.sleep(10);
-					} catch (InterruptedException e) {
-						LOG.warn("Interruption", e);
-					}
-				}
-
-				for (KafkaServer kafkaServer : brokers) {
-					if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
-						LOG.info("Killing Kafka Server {}", leaderToShutDown);
-						kafkaServer.shutdown();
-						leaderHasShutDown = true;
-						break;
-					}
-				}
-			}
-		});
-		brokerShutdown.start();
-
-		// add consuming topology:
-		DataStreamSource<String> consuming = env.addSource(new PersistentKafkaSource<String>(topic, new JavaDefaultStringSchema(), standardCC));
-		consuming.setParallelism(1);
-
-		consuming.addSink(new SinkFunction<String>() {
-			int elCnt = 0;
-			int start = 0;
-			int numOfMessagesToBeCorrect = 100;
-			int stopAfterMessages = 150;
-
-			BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
-
-			@Override
-			public void invoke(String value) throws Exception {
-				LOG.info("Got message = " + value + " leader has shut down " + leaderHasShutDown + " el cnt = " + elCnt + " to rec" + numOfMessagesToBeCorrect);
-				String[] sp = value.split("-");
-				int v = Integer.parseInt(sp[1]);
-
-				if (start == -1) {
-					start = v;
-				}
-				int offset = v - start;
-				Assert.assertFalse("Received tuple with value " + offset + " twice", validator.get(offset));
-				if (v - start < 0 && LOG.isWarnEnabled()) {
-					LOG.warn("Not in order: {}", value);
-				}
-
-				validator.set(offset);
-				elCnt++;
-				if (elCnt == 20) {
-					LOG.info("Asking leading broker to shut down");
-					// shut down a Kafka broker
-					shutdownKafkaBroker = true;
-				}
-				if (shutdownKafkaBroker) {
-					// we become a bit slower because the shutdown takes some time and we have
-					// only a fixed nubmer of elements to read
-					Thread.sleep(20);
-				}
-				if (leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
-					if (elCnt >= stopAfterMessages) {
-						// check if everything in the bitset is set to true
-						int nc;
-						if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
-							throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
-						}
-						throw new SuccessException();
-					}
-				}
-			}
-		});
-		tryExecute(env, "broker failure test - reader");
-
-	}
-
-	public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
-		try {
-			see.execute(name);
-		} catch (JobExecutionException good) {
-			Throwable t = good.getCause();
-			int limit = 0;
-			while (!(t instanceof SuccessException)) {
-				if(t == null) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
-				}
-
-				t = t.getCause();
-				if (limit++ == 20) {
-					LOG.warn("Test failed with exception", good);
-					Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
-				}
-			}
-		}
-	}
-
-	private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
-		// create topic
-		Properties topicConfig = new Properties();
-		LOG.info("Creating topic {}", topic);
-		AdminUtils.createTopic(zkClient, topic, numberOfPartitions, replicationFactor, topicConfig);
-	}
-
-	private static TestingServer getZookeeper() throws Exception {
-		return new TestingServer(zkPort, tmpZkDir);
-	}
-
-	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
-	 */
-	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
-		Properties kafkaProperties = new Properties();
-
-		int kafkaPort = NetUtils.getAvailablePort();
-
-		// properties have to be Strings
-		kafkaProperties.put("advertised.host.name", kafkaHost);
-		kafkaProperties.put("port", Integer.toString(kafkaPort));
-		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpFolder.toString());
-		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
-		kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
-		kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
-		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
-
-		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
-		server.startup();
-		return server;
-	}
-
-	public static class SuccessException extends Exception {
-		private static final long serialVersionUID = 1L;
-	}
-
-
-	// ----------------------- Debugging utilities --------------------
-
-	/**
-	 * Read topic to list, only using Kafka code.
-	 * @return
-	 */
-	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
-		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
-		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
-		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
-		}
-		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
-		}
-		if(kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
-		}
-		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
-		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
-
-		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
-		int read = 0;
-		while(iteratorToRead.hasNext()) {
-			read++;
-			result.add(iteratorToRead.next());
-			if(read == stopAfter) {
-				LOG.info("Read "+read+" elements");
-				return result;
-			}
-		}
-		return result;
-	}
-
-	private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema deserializationSchema, int stopAfter){
-		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
-		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
-		for(MessageAndMetadata<byte[], byte[]> message: contents) {
-			Object out = deserializationSchema.deserialize(message.message());
-			LOG.info("Message: partition: {} offset: {} msg: {}", message.partition(), message.offset(), out.toString());
-		}
-	}
-
-	private static void printTopic(String topicName, int elements, ExecutionConfig ec) {
-		// write the sequence to log for debugging purposes
-		Properties stdProps = standardCC.props().props();
-		Properties newProps = new Properties(stdProps);
-		newProps.setProperty("group.id", "topic-printer"+UUID.randomUUID().toString());
-		newProps.setProperty("auto.offset.reset", "smallest");
-		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
-
-		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		DeserializationSchema deserializer = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
-		printTopic(topicName, printerConfig, deserializer, elements);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0fd60233/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
deleted file mode 100644
index 18fa46f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/kafka/util/KafkaLocalSystemTime.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import kafka.utils.Time;
-
-public class KafkaLocalSystemTime implements Time {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
-
-	@Override
-	public long milliseconds() {
-		return System.currentTimeMillis();
-	}
-
-	public long nanoseconds() {
-		return System.nanoTime();
-	}
-
-	@Override
-	public void sleep(long ms) {
-		try {
-			Thread.sleep(ms);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruption", e);
-		}
-	}
-
-}
-


[5/6] flink git commit: [docs] Removed unnecessary Serializable from ExecutionEnvironment JavaDocs

Posted by mb...@apache.org.
[docs] Removed unnecessary Serializable from ExecutionEnvironment JavaDocs

Also did for StreamExecutionEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2315c7c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2315c7c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2315c7c5

Branch: refs/heads/master
Commit: 2315c7c5935a703eee10eb9eabb75aa8490d1457
Parents: 120bd0f
Author: mbalassi <mb...@apache.org>
Authored: Sun May 24 17:51:21 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 30 +--------
 .../environment/StreamExecutionEnvironment.java | 64 ++++++--------------
 2 files changed, 21 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2315c7c5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 9c76409..3a5b04f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.java;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -600,9 +599,7 @@ public abstract class ExecutionEnvironment {
 	
 	/**
 	 * Creates a DataSet from the given non-empty collection. The type of the data set is that
-	 * of the elements in the collection. The elements need to be serializable (as defined by
-	 * {@link java.io.Serializable}), because the framework may move the elements into the cluster
-	 * if needed.
+	 * of the elements in the collection.
 	 * <p>
 	 * The framework will try and determine the exact type from the collection elements.
 	 * In case of generic elements, it may be necessary to manually supply the type information
@@ -632,13 +629,8 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	/**
-	 * Creates a DataSet from the given non-empty collection. The type of the data set is that
-	 * of the elements in the collection. The elements need to be serializable (as defined by
-	 * {@link java.io.Serializable}), because the framework may move the elements into the cluster
-	 * if needed.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
-	 * a parallelism of one.
+	 * Creates a DataSet from the given non-empty collection. Note that this operation will result
+	 * in a non-parallel data source, i.e. a data source with a parallelism of one.
 	 * <p>
 	 * The returned DataSet is typed to the given TypeInformation.
 	 *  
@@ -663,9 +655,6 @@ public abstract class ExecutionEnvironment {
 	 * explicitly in the form of the type class (this is due to the fact that the Java compiler
 	 * erases the generic type information).
 	 * <p>
-	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the
-	 * framework may move it to a remote environment, if needed.
-	 * <p>
 	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
 	 * 
@@ -686,9 +675,6 @@ public abstract class ExecutionEnvironment {
 	 * is generic. In that case, the type class (as given in {@link #fromCollection(Iterator, Class)}
 	 * does not supply all type information.
 	 * <p>
-	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the
-	 * framework may move it to a remote environment, if needed.
-	 * <p>
 	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
 	 * 
@@ -699,10 +685,6 @@ public abstract class ExecutionEnvironment {
 	 * @see #fromCollection(Iterator, Class)
 	 */
 	public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
-		if (!(data instanceof Serializable)) {
-			throw new IllegalArgumentException("The iterator must be serializable.");
-		}
-		
 		return new DataSource<X>(this, new IteratorInputFormat<X>(data), type, Utils.getCallLocationName());
 	}
 	
@@ -710,8 +692,6 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a new data set that contains the given elements. The elements must all be of the same type,
 	 * for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
-	 * Furthermore, the elements must be serializable (as defined in {@link java.io.Serializable}, because the
-	 * execution environment may ship the elements into the cluster.
 	 * <p>
 	 * The framework will try and determine the exact type from the collection elements.
 	 * In case of generic elements, it may be necessary to manually supply the type information
@@ -738,8 +718,6 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data source that returns the elements in the iterator.
-	 * The iterator must be serializable (as defined in {@link java.io.Serializable}, because the
-	 * execution environment may ship the elements into the cluster.
 	 * <p>
 	 * Because the iterator will remain unmodified until the actual execution happens, the type of data
 	 * returned by the iterator must be given explicitly in the form of the type class (this is due to the
@@ -758,8 +736,6 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data source that returns the elements in the iterator.
-	 * The iterator must be serializable (as defined in {@link java.io.Serializable}, because the
-	 * execution environment may ship the elements into the cluster.
 	 * <p>
 	 * Because the iterator will remain unmodified until the actual execution happens, the type of data
 	 * returned by the iterator must be given explicitly in the form of the type information.

http://git-wip-us.apache.org/repos/asf/flink/blob/2315c7c5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 09a8788..8dd15bb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.mapreduce.Job;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -438,15 +437,12 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Creates a new data stream that contains the given elements. The elements must all be of the same type, for
-	 * example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty. Furthermore,
-	 * the elements must be serializable (as defined in {@link java.io.Serializable}), because the execution
-	 * environment
-	 * may ship the elements into the cluster.
-	 * <p/>
+	 * example, all of the {@link String} or {@link Integer}.
+	 * <p>
 	 * The framework will try and determine the exact type from the elements. In case of generic elements, it may be
 	 * necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
 	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
-	 * <p/>
+	 * <p>
 	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 	 * degree of parallelism one.
 	 *
@@ -471,15 +467,14 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
-	 * elements in
-	 * the collection. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
-	 * framework may move the elements into the cluster if needed.
-	 * <p/>
+	 * elements in the collection.
+	 *
+	 * <p>
 	 * The framework will try and determine the exact type from the collection elements. In case of generic
-	 * elements, it
-	 * may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
-	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
-	 * <p/>
+	 * elements, it may be necessary to manually supply the type information via
+	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
+	 * <p>
+	 *
 	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 	 * degree of parallelism one.
 	 *
@@ -503,12 +498,8 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream from the given non-empty collection. The type of the data stream is the type given by
-	 * typeInfo. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
-	 * framework may move the elements into the cluster if needed.
-	 * <p/>
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
-	 * degree of parallelism one.
+	 * Creates a data stream from the given non-empty collection.Note that this operation will result in
+	 * a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
 	 *
 	 * @param data
 	 * 		The collection of elements to create the data stream from
@@ -535,11 +526,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
 	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
 	 * class (this is due to the fact that the Java compiler erases the generic type information).
-	 * <p/>
-	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
-	 * move it
-	 * to a remote environment, if needed.
-	 * <p/>
+	 * <p>
 	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 	 * degree of parallelism of one.
 	 *
@@ -562,11 +549,7 @@ public abstract class StreamExecutionEnvironment {
 	 * information. This method is useful for cases where the type is generic. In that case, the type class (as
 	 * given in
 	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
-	 * <p/>
-	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
-	 * move it
-	 * to a remote environment, if needed.
-	 * <p/>
+	 * <p>
 	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 	 * degree of parallelism one.
 	 *
@@ -581,9 +564,6 @@ public abstract class StreamExecutionEnvironment {
 	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
 			typeInfo) {
 		Preconditions.checkNotNull(data, "The iterator must not be null");
-		if (!(data instanceof Serializable)) {
-			throw new IllegalArgumentException("The iterator must be serializable.");
-		}
 
 		SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
 		return addSource(function, "Collection Source").returns(typeInfo);
@@ -597,12 +577,8 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
-	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
-	 * must be
-	 * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
-	 * elements
-	 * into the cluster.
-	 * <p/>
+	 * framework to create a parallel data stream source that returns the elements in the iterator.
+	 * <p>
 	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
 	 * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
 	 * erases the generic type information).
@@ -621,12 +597,8 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
-	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
-	 * must be
-	 * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
-	 * elements
-	 * into the cluster.
-	 * <p/>
+	 * framework to create a parallel data stream source that returns the elements in the iterator.
+	 * <p>
 	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
 	 * iterator must be given explicitly in the form of the type information. This method is useful for cases where the
 	 * type is generic. In that case, the type class (as given in {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator,


[3/6] flink git commit: [FLINK-1687] [streaming] [api-extending] Synchronizing streaming source API to batch source API

Posted by mb...@apache.org.
[FLINK-1687] [streaming] [api-extending] Synchronizing streaming source API to batch source API

Closes #521


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c3b67ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c3b67ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c3b67ed

Branch: refs/heads/master
Commit: 1c3b67edd007d6d3727f1786eb0a99a0ee3a64c9
Parents: 84fce54
Author: szape <ne...@gmail.com>
Authored: Thu Apr 16 14:21:16 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 896 ++++++++++++++-----
 .../source/FileMonitoringFunction.java          |   2 -
 .../functions/source/FileSourceFunction.java    |  33 +-
 .../functions/source/FromIteratorFunction.java  |  44 +
 .../flink/streaming/api/graph/StreamGraph.java  |   2 +-
 .../flink/streaming/api/graph/StreamNode.java   |   6 +-
 6 files changed, 720 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 02c8dad..a1fe60d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -17,24 +17,29 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import java.io.File;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-
+import com.esotericsoftware.kryo.Serializer;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.flink.api.java.io.PrimitiveInputFormat;
 import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.io.TextValueInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
@@ -44,25 +49,35 @@ import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.apache.flink.util.SplittableIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 
-import com.esotericsoftware.kryo.Serializer;
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 
 /**
- * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
+ * {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
  * necessary to construct streaming topologies.
- * 
  */
 public abstract class StreamExecutionEnvironment {
 
@@ -100,9 +115,9 @@ public abstract class StreamExecutionEnvironment {
 	 * Gets the parallelism with which operation are executed by default.
 	 * Operations can individually override this value to use a specific
 	 * parallelism.
-	 * 
+	 *
 	 * @return The parallelism used by operations, unless they override that
-	 *         value.
+	 * value.
 	 * @deprecated Please use {@link #getParallelism}
 	 */
 	@Deprecated
@@ -114,9 +129,9 @@ public abstract class StreamExecutionEnvironment {
 	 * Gets the parallelism with which operation are executed by default.
 	 * Operations can individually override this value to use a specific
 	 * parallelism.
-	 * 
+	 *
 	 * @return The parallelism used by operations, unless they override that
-	 *         value.
+	 * value.
 	 */
 	public int getParallelism() {
 		return config.getParallelism();
@@ -131,9 +146,9 @@ public abstract class StreamExecutionEnvironment {
 	 * number of hardware contexts (CPU cores / threads). When executing the
 	 * program via the command line client from a JAR file, the default degree
 	 * of parallelism is the one configured for that setup.
-	 * 
+	 *
 	 * @param parallelism
-	 *            The parallelism
+	 * 		The parallelism
 	 * @deprecated Please use {@link #setParallelism}
 	 */
 	@Deprecated
@@ -150,9 +165,9 @@ public abstract class StreamExecutionEnvironment {
 	 * number of hardware contexts (CPU cores / threads). When executing the
 	 * program via the command line client from a JAR file, the default degree
 	 * of parallelism is the one configured for that setup.
-	 * 
+	 *
 	 * @param parallelism
-	 *            The parallelism
+	 * 		The parallelism
 	 */
 	public StreamExecutionEnvironment setParallelism(int parallelism) {
 		if (parallelism < 1) {
@@ -167,7 +182,7 @@ public abstract class StreamExecutionEnvironment {
 	 * output buffers. By default the output buffers flush frequently to provide
 	 * low latency and to aid smooth developer experience. Setting the parameter
 	 * can result in three logical modes:
-	 * 
+	 * <p/>
 	 * <ul>
 	 * <li>
 	 * A positive integer triggers flushing periodically by that integer</li>
@@ -177,9 +192,9 @@ public abstract class StreamExecutionEnvironment {
 	 * -1 triggers flushing only when the output buffer is full thus maximizing
 	 * throughput</li>
 	 * </ul>
-	 * 
+	 *
 	 * @param timeoutMillis
-	 *            The maximum time between two output flushes.
+	 * 		The maximum time between two output flushes.
 	 */
 	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
 		if (timeoutMillis < -1) {
@@ -194,7 +209,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Disables operator chaining for streaming operators. Operator chaining
 	 * allows non-shuffle operations to be co-located in the same thread fully
 	 * avoiding serialization and de-serialization.
-	 * 
+	 *
 	 * @return StreamExecutionEnvironment with chaining disabled.
 	 */
 	public StreamExecutionEnvironment disableOperatorChaining() {
@@ -205,16 +220,16 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Method for enabling fault-tolerance. Activates monitoring and backup of
 	 * streaming operator states.
-	 * 
-	 * <p>
+	 * <p/>
+	 * <p/>
 	 * Setting this option assumes that the job is used in production and thus
 	 * if not stated explicitly otherwise with calling with the
 	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
 	 * in case of failure the job will be resubmitted to the cluster
 	 * indefinitely.
-	 * 
+	 *
 	 * @param interval
-	 *            Time interval between state checkpoints in millis
+	 * 		Time interval between state checkpoints in millis
 	 */
 	public StreamExecutionEnvironment enableCheckpointing(long interval) {
 		streamGraph.setCheckpointingEnabled(true);
@@ -225,8 +240,8 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Method for enabling fault-tolerance. Activates monitoring and backup of
 	 * streaming operator states.
-	 * 
-	 * <p>
+	 * <p/>
+	 * <p/>
 	 * Setting this option assumes that the job is used in production and thus
 	 * if not stated explicitly otherwise with calling with the
 	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
@@ -256,10 +271,10 @@ public abstract class StreamExecutionEnvironment {
 	 * zero effectively disables fault tolerance. A value of {@code -1}
 	 * indicates that the system default value (as defined in the configuration)
 	 * should be used.
-	 * 
+	 *
 	 * @param numberOfExecutionRetries
-	 *            The number of times the system will try to re-execute failed
-	 *            tasks.
+	 * 		The number of times the system will try to re-execute failed
+	 * 		tasks.
 	 */
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
@@ -269,9 +284,9 @@ public abstract class StreamExecutionEnvironment {
 	 * Gets the number of times the system will try to re-execute failed tasks.
 	 * A value of {@code -1} indicates that the system default value (as defined
 	 * in the configuration) should be used.
-	 * 
+	 *
 	 * @return The number of times the system will try to re-execute failed
-	 *         tasks.
+	 * tasks.
 	 */
 	public int getNumberOfExecutionRetries() {
 		return config.getNumberOfExecutionRetries();
@@ -281,7 +296,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Sets the maximum time frequency (milliseconds) for the flushing of the
 	 * output buffers. For clarification on the extremal values see
 	 * {@link #setBufferTimeout(long)}.
-	 * 
+	 *
 	 * @return The timeout of the buffer.
 	 */
 	public long getBufferTimeout() {
@@ -291,9 +306,9 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Sets the default parallelism that will be used for the local execution
 	 * environment created by {@link #createLocalEnvironment()}.
-	 * 
+	 *
 	 * @param parallelism
-	 *            The parallelism to use as the default local parallelism.
+	 * 		The parallelism to use as the default local parallelism.
 	 */
 	public static void setDefaultLocalParallelism(int parallelism) {
 		defaultLocalParallelism = parallelism;
@@ -305,15 +320,15 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Adds a new Kryo default serializer to the Runtime.
-	 * 
+	 * <p/>
 	 * Note that the serializer instance must be serializable (as defined by
 	 * java.io.Serializable), because it may be distributed to the worker nodes
 	 * by java serialization.
-	 * 
+	 *
 	 * @param type
-	 *            The class of the types serialized with the given serializer.
+	 * 		The class of the types serialized with the given serializer.
 	 * @param serializer
-	 *            The serializer to use.
+	 * 		The serializer to use.
 	 */
 	public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) {
 		config.addDefaultKryoSerializer(type, serializer);
@@ -321,11 +336,11 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Adds a new Kryo default serializer to the Runtime.
-	 * 
+	 *
 	 * @param type
-	 *            The class of the types serialized with the given serializer.
+	 * 		The class of the types serialized with the given serializer.
 	 * @param serializerClass
-	 *            The class of the serializer to use.
+	 * 		The class of the serializer to use.
 	 */
 	public void addDefaultKryoSerializer(Class<?> type,
 			Class<? extends Serializer<?>> serializerClass) {
@@ -334,15 +349,15 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Registers the given type with a Kryo Serializer.
-	 * 
+	 * <p/>
 	 * Note that the serializer instance must be serializable (as defined by
 	 * java.io.Serializable), because it may be distributed to the worker nodes
 	 * by java serialization.
-	 * 
+	 *
 	 * @param type
-	 *            The class of the types serialized with the given serializer.
+	 * 		The class of the types serialized with the given serializer.
 	 * @param serializer
-	 *            The serializer to use.
+	 * 		The serializer to use.
 	 */
 	public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) {
 		config.registerTypeWithKryoSerializer(type, serializer);
@@ -351,11 +366,11 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Registers the given Serializer via its class as a serializer for the
 	 * given type at the KryoSerializer
-	 * 
+	 *
 	 * @param type
-	 *            The class of the types serialized with the given serializer.
+	 * 		The class of the types serialized with the given serializer.
 	 * @param serializerClass
-	 *            The class of the serializer to use.
+	 * 		The class of the serializer to use.
 	 */
 	public void registerTypeWithKryoSerializer(Class<?> type,
 			Class<? extends Serializer<?>> serializerClass) {
@@ -367,9 +382,9 @@ public abstract class StreamExecutionEnvironment {
 	 * eventually serialized as a POJO, then the type is registered with the
 	 * POJO serializer. If the type ends up being serialized with Kryo, then it
 	 * will be registered at Kryo to make sure that only tags are written.
-	 * 
+	 *
 	 * @param type
-	 *            The class of the type to register.
+	 * 		The class of the type to register.
 	 */
 	public void registerType(Class<?> type) {
 		if (type == null) {
@@ -390,31 +405,271 @@ public abstract class StreamExecutionEnvironment {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Creates a DataStream that represents the Strings produced by reading the
-	 * given file line wise. The file will be read with the system's default
-	 * character set.
-	 * 
+	 * Creates a new data stream that contains a sequence of numbers. The data stream will be created with parallelism
+	 * one, so the order of the elements is guaranteed.
+	 *
+	 * @param from
+	 * 		The number to start at (inclusive)
+	 * @param to
+	 * 		The number to stop at (inclusive)
+	 * @return A data stream, containing all number in the [from, to] interval
+	 */
+	public DataStreamSource<Long> generateSequence(long from, long to) {
+		if (from > to) {
+			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
+		}
+		return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+	}
+
+	/**
+	 * Creates a new data stream that contains a sequence of numbers. The data stream will be created in parallel, so
+	 * there is no guarantee about the oder of the elements.
+	 *
+	 * @param from
+	 * 		The number to start at (inclusive)
+	 * @param to
+	 * 		The number to stop at (inclusive)
+	 * @return A data stream, containing all number in the [from, to] interval
+	 */
+	public DataStreamSource<Long> generateParallelSequence(long from, long to) {
+		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parellel " +
+				"Sequence source");
+	}
+
+	/**
+	 * Creates a new data stream that contains the given elements. The elements must all be of the same type, for
+	 * example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty. Furthermore,
+	 * the elements must be serializable (as defined in {@link java.io.Serializable}), because the execution
+	 * environment
+	 * may ship the elements into the cluster.
+	 * <p/>
+	 * The framework will try and determine the exact type from the elements. In case of generic elements, it may be
+	 * necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
+	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+	 * <p/>
+	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+	 * degree of parallelism one.
+	 *
+	 * @param data
+	 * 		The array of elements to create the data stream from.
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream representing the given array of elements
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+		if (data.length == 0) {
+			throw new IllegalArgumentException(
+					"fromElements needs at least one element as argument");
+		}
+
+		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
+
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+
+		return addSource(function, "Elements source").returns(typeInfo);
+	}
+
+	/**
+	 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
+	 * elements in
+	 * the collection. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
+	 * framework may move the elements into the cluster if needed.
+	 * <p/>
+	 * The framework will try and determine the exact type from the collection elements. In case of generic
+	 * elements, it
+	 * may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
+	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
+	 * <p/>
+	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+	 * degree of parallelism one.
+	 *
+	 * @param data
+	 * 		The collection of elements to create the data stream from
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream representing the given collection
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+		Preconditions.checkNotNull(data, "Collection must not be null");
+		if (data.isEmpty()) {
+			throw new IllegalArgumentException("Collection must not be empty");
+		}
+
+		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		checkCollection(data, typeInfo.getTypeClass());
+
+		return addSource(function, "Collection Source").returns(typeInfo);
+	}
+
+	/**
+	 * Creates a data stream from the given non-empty collection. The type of the data stream is the type given by
+	 * typeInfo. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
+	 * framework may move the elements into the cluster if needed.
+	 * <p/>
+	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+	 * degree of parallelism one.
+	 *
+	 * @param data
+	 * 		The collection of elements to create the data stream from
+	 * @param typeInfo
+	 * 		The TypeInformation for the produced data stream
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream representing the given collection
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
+			typeInfo) {
+		Preconditions.checkNotNull(data, "Collection must not be null");
+		if (data.isEmpty()) {
+			throw new IllegalArgumentException("Collection must not be empty");
+		}
+
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		checkCollection(data, typeInfo.getTypeClass());
+
+		return addSource(function, "Collection Source").returns(typeInfo);
+	}
+
+	/**
+	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
+	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
+	 * class (this is due to the fact that the Java compiler erases the generic type information).
+	 * <p/>
+	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
+	 * move it
+	 * to a remote environment, if needed.
+	 * <p/>
+	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+	 * degree of parallelism of one.
+	 *
+	 * @param data
+	 * 		The iterator of elements to create the data stream from
+	 * @param type
+	 * 		The class of the data produced by the iterator. Must not be a generic class.
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream representing the elements in the iterator
+	 * @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
+		return fromCollection(data, TypeExtractor.getForClass(type));
+	}
+
+	/**
+	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
+	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
+	 * information. This method is useful for cases where the type is generic. In that case, the type class (as
+	 * given in
+	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
+	 * <p/>
+	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
+	 * move it
+	 * to a remote environment, if needed.
+	 * <p/>
+	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+	 * degree of parallelism one.
+	 *
+	 * @param data
+	 * 		The iterator of elements to create the data stream from
+	 * @param typeInfo
+	 * 		The TypeInformation for the produced data stream
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream representing the elements in the iterator
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
+			typeInfo) {
+		Preconditions.checkNotNull(data, "The iterator must not be null");
+		if (!(data instanceof Serializable)) {
+			throw new IllegalArgumentException("The iterator must be serializable.");
+		}
+
+		SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
+		return addSource(function, "Collection Source").returns(typeInfo);
+	}
+
+	/**
+	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
+	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
+	 * must be
+	 * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
+	 * elements
+	 * into the cluster.
+	 * <p/>
+	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
+	 * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
+	 * erases the generic type information).
+	 *
+	 * @param iterator
+	 * 		The iterator that produces the elements of the data stream
+	 * @param type
+	 * 		The class of the data produced by the iterator. Must not be a generic class.
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return A data stream representing the elements in the iterator
+	 */
+	public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type) {
+		return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
+	}
+
+	/**
+	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
+	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
+	 * must be
+	 * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
+	 * elements
+	 * into the cluster.
+	 * <p/>
+	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
+	 * iterator must be given explicitly in the form of the type information. This method is useful for cases where the
+	 * type is generic. In that case, the type class (as given in {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator,
+	 * Class)} does not supply all type information.
+	 *
+	 * @param iterator
+	 * 		The iterator that produces the elements of the data stream
+	 * @param typeInfo
+	 * 		The TypeInformation for the produced data stream.
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return A data stream representing the elements in the iterator
+	 */
+	public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
+			typeInfo) {
+		return fromParallelCollection(iterator, typeInfo, "Parallel Collection source");
+	}
+
+	// private helper for passing different names
+	private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
+			typeInfo, String operatorName) {
+		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+	}
+
+	/**
+	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be
+	 * read with the system's default character set.
+	 *
 	 * @param filePath
-	 *            The path of the file, as a URI (e.g.,
-	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @return The DataStream representing the text file.
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @return The data stream that represents the data read from the given file as text lines
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 
-		return addFileSource(format, typeInfo);
+		return createInput(format, typeInfo, "Read Text File Source");
 	}
 
 	/**
-	 * Creates a DataStream that represents the Strings produced by reading the
-	 * given file line wise. The file will be read with the given character set.
-	 * 
+	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link
+	 * java.nio.charset.Charset} with the given name will be used to read the files.
+	 *
 	 * @param filePath
-	 *            The path of the file, as a URI (e.g.,
-	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @return The DataStream representing the text file.
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param charsetName
+	 * 		The name of the character set used to read the file
+	 * @return The data stream that represents the data read from the given file as text lines
 	 */
 	public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
@@ -422,201 +677,352 @@ public abstract class StreamExecutionEnvironment {
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 		format.setCharsetName(charsetName);
 
-		return addFileSource(format, typeInfo);
+		return createInput(format, typeInfo, "Read Text File Source");
 	}
 
 	/**
-	 * Creates a DataStream that contains the contents of file created while
-	 * system watches the given path. The file will be read with the system's
-	 * default character set.
-	 * 
+	 * Creates a data stream that contains the contents of file created while system watches the given path. The file
+	 * will be read with the system's default character set.
+	 *
 	 * @param filePath
-	 *            The path of the file, as a URI (e.g.,
-	 *            "file:///some/local/file" or "hdfs://host:port/file/path/").
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
 	 * @param intervalMillis
-	 *            The interval of file watching in milliseconds.
+	 * 		The interval of file watching in milliseconds
 	 * @param watchType
-	 *            The watch type of file stream. When watchType is
-	 *            {@link WatchType#ONLY_NEW_FILES}, the system processes only
-	 *            new files. {@link WatchType#REPROCESS_WITH_APPENDED} means
-	 *            that the system re-processes all contents of appended file.
-	 *            {@link WatchType#PROCESS_ONLY_APPENDED} means that the system
-	 *            processes only appended contents of files.
-	 * 
+	 * 		The watch type of file stream. When watchType is {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES}, the system processes
+	 * 		only
+	 * 		new files. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED} means that the system re-processes all contents of
+	 * 		appended file. {@link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED} means that the system processes only appended
+	 * 		contents
+	 * 		of files.
 	 * @return The DataStream containing the given directory.
 	 */
 	public DataStream<String> readFileStream(String filePath, long intervalMillis,
 			WatchType watchType) {
-//		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-//				filePath, intervalMillis, watchType), "File Stream");
-//		return source.flatMap(new FileReadFunction());
-		return null;
+		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
+				filePath, intervalMillis, watchType), "Read File Stream source");
+
+		return source.flatMap(new FileReadFunction());
 	}
 
 	/**
-	 * Creates a new DataStream that contains the given elements. The elements
-	 * must all be of the same type, for example, all of the String or Integer.
-	 * The sequence of elements must not be empty. Furthermore, the elements
-	 * must be serializable (as defined in java.io.Serializable), because the
-	 * execution environment may ship the elements into the cluster.
-	 * 
-	 * @param data
-	 *            The collection of elements to create the DataStream from.
-	 * @param <OUT>
-	 *            type of the returned stream
-	 * @return The DataStream representing the elements.
+	 * Creates a data stream that represents the strings produced by reading the given file line wise. This method is
+	 * similar to {@link #readTextFile(String)}, but it produces a data stream with mutable {@link org.apache.flink.types.StringValue}
+	 * objects,
+	 * rather than Java Strings. StringValues can be used to tune implementations to be less object and garbage
+	 * collection heavy.
+	 * <p/>
+	 * The file will be read with the system's default character set.
+	 *
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @return A data stream that represents the data read from the given file as text lines
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
-		if (data.length == 0) {
-			throw new IllegalArgumentException(
-					"fromElements needs at least one element as argument");
-		}
+	public DataStreamSource<StringValue> readTextFileWithValue(String filePath) {
+		Preconditions.checkNotNull(filePath, "The file path may not be null.");
+		TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
+		TypeInformation<StringValue> typeInfo = new ValueTypeInfo<StringValue>(StringValue.class);
 
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
+		return createInput(format, typeInfo, "Read Text File with Value " +
+				"source");
+	}
 
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+	/**
+	 * Creates a data stream that represents the Strings produced by reading the given file line wise. This method is
+	 * similar to {@link #readTextFile(String, String)}, but it produces a data stream with mutable {@link org.apache.flink.types.StringValue}
+	 * objects, rather than Java Strings. StringValues can be used to tune implementations to be less object and
+	 * garbage
+	 * collection heavy.
+	 * <p/>
+	 * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+	 *
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param charsetName
+	 * 		The name of the character set used to read the file
+	 * @param skipInvalidLines
+	 * 		A flag to indicate whether to skip lines that cannot be read with the given character set
+	 * @return A data stream that represents the data read from the given file as text lines
+	 */
+	public DataStreamSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean
+			skipInvalidLines) {
+		Preconditions.checkNotNull(filePath, "The file path may not be null.");
 
-		return addSource(function, "Elements source").returns(outTypeInfo);
+		TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
+		TypeInformation<StringValue> typeInfo = new ValueTypeInfo<StringValue>(StringValue.class);
+		format.setCharsetName(charsetName);
+		format.setSkipInvalidLines(skipInvalidLines);
+		return createInput(format, typeInfo, "Read Text File with Value " +
+				"source");
 	}
 
 	/**
-	 * Creates a DataStream from the given non-empty collection. The type of the
-	 * DataStream is that of the elements in the collection. The elements need
-	 * to be serializable (as defined by java.io.Serializable), because the
-	 * framework may move the elements into the cluster if needed.
-	 * 
-	 * @param data
-	 *            The collection of elements to create the DataStream from.
+	 * Reads the given file with the given imput format.
+	 *
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
 	 * @param <OUT>
-	 *            type of the returned stream
-	 * @return The DataStream representing the elements.
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data read from the given file
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
-		if (data == null) {
-			throw new NullPointerException("Collection must not be null");
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
+		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
+		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+
+		inputFormat.setFilePath(new Path(filePath));
+		try {
+			return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Read File source");
+		} catch (Exception e) {
+			throw new InvalidProgramException("The type returned by the input format could not be automatically " +
+					"determined. " +
+					"Please specify the TypeInformation of the produced type explicitly by using the " +
+					"'createInput(InputFormat, TypeInformation)' method instead.");
 		}
+	}
 
-		if (data.isEmpty()) {
-			throw new IllegalArgumentException("Collection must not be empty");
-		}
+	/**
+	 * Creates a data stream that represents the primitive type produced by reading the given file line wise.
+	 *
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param typeClass
+	 * 		The primitive type class to be read
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return A data stream that represents the data read from the given file as primitive type
+	 */
+	public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, Class<OUT> typeClass) {
+		Preconditions.checkNotNull(filePath, "The file path may not be null.");
+		PrimitiveInputFormat<OUT> inputFormat = new PrimitiveInputFormat<OUT>(new Path(filePath), typeClass);
+		TypeInformation<OUT> typeInfo = TypeExtractor.getForClass(typeClass);
 
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		return createInput(inputFormat, typeInfo, "Read File of Primitives source");
+	}
+
+	/**
+	 * Creates a data stream that represents the primitive type produced by reading the given file in delimited way.
+	 *
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param delimiter
+	 * 		The delimiter of the given file
+	 * @param typeClass
+	 * 		The primitive type class to be read
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return A data stream that represents the data read from the given file as primitive type.
+	 */
+	public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, String delimiter, Class<OUT> typeClass) {
+		Preconditions.checkNotNull(filePath, "The file path may not be null.");
+		PrimitiveInputFormat<OUT> inputFormat = new PrimitiveInputFormat<OUT>(new Path(filePath), delimiter,
+				typeClass);
+		TypeInformation<OUT> typeInfo = TypeExtractor.getForClass(typeClass);
 
-		return addSource(function, "Collection Source").returns(outTypeInfo);
+		return createInput(inputFormat, typeInfo, "Read File of Primitives source");
 	}
 
 	/**
-	 * Creates a new DataStream that contains the strings received infinitely
-	 * from socket. Received strings are decoded by the system's default
-	 * character set. On the termination of the socket server connection retries
-	 * can be initiated.
-	 * 
-	 * <p>
-	 * Let us note that the socket itself does not report on abort and as a
-	 * consequence retries are only initiated when the socket was gracefully
-	 * terminated.
-	 * </p>
-	 * 
+	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
+	 * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+	 */
+	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
+			mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+		DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
+
+		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+
+		return result;
+	}
+
+	/**
+	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
+	 * org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+	 */
+	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
+			mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+	}
+
+	/**
+	 * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
+	 * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+	 */
+	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
+			.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws
+			IOException {
+		DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
+
+		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
+				.hadoop.fs.Path(inputPath));
+
+		return result;
+	}
+
+	/**
+	 * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 */
+	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
+			.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws
+			IOException {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
+	}
+
+	/**
+	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
+	 * initiated.
+	 * <p/>
+	 * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
+	 * the socket was gracefully terminated.
+	 *
 	 * @param hostname
-	 *            The host name which a server socket bind.
+	 * 		The host name which a server socket binds
 	 * @param port
-	 *            The port number which a server socket bind. A port number of 0
-	 *            means that the port number is automatically allocated.
+	 * 		The port number which a server socket binds. A port number of 0 means that the port number is automatically
+	 * 		allocated.
 	 * @param delimiter
-	 *            A character which split received strings into records.
+	 * 		A character which splits received strings into records
 	 * @param maxRetry
-	 *            The maximal retry interval in seconds while the program waits
-	 *            for a socket that is temporarily down. Reconnection is
-	 *            initiated every second. A number of 0 means that the reader is
-	 *            immediately terminated, while a negative value ensures
-	 *            retrying forever.
-	 * @return A DataStream, containing the strings received from socket.
-	 * 
+	 * 		The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
+	 * 		Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
+	 * 		while
+	 * 		a	negative value ensures retrying forever.
+	 * @return A data stream containing the strings received from the socket
 	 */
-	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter,
-			long maxRetry) {
+	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
 		return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
 				"Socket Stream");
 	}
 
 	/**
-	 * Creates a new DataStream that contains the strings received infinitely
-	 * from socket. Received strings are decoded by the system's default
-	 * character set. The reader is terminated immediately when socket is down.
-	 * 
+	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+	 * decoded by the system's default character set. The reader is terminated immediately when the socket is down.
+	 *
 	 * @param hostname
-	 *            The host name which a server socket bind.
+	 * 		The host name which a server socket binds
 	 * @param port
-	 *            The port number which a server socket bind. A port number of 0
-	 *            means that the port number is automatically allocated.
+	 * 		The port number which a server socket binds. A port number of 0 means that the port number is automatically
+	 * 		allocated.
 	 * @param delimiter
-	 *            A character which split received strings into records.
-	 * @return A DataStream, containing the strings received from socket.
+	 * 		A character which splits received strings into records
+	 * @return A data stream containing the strings received from the socket
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
 		return socketTextStream(hostname, port, delimiter, 0);
 	}
 
 	/**
-	 * Creates a new DataStream that contains the strings received infinitely
-	 * from socket. Received strings are decoded by the system's default
-	 * character set, uses '\n' as delimiter. The reader is terminated
-	 * immediately when socket is down.
-	 * 
+	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
+	 * decoded by the system's default character set, using'\n' as delimiter. The reader is terminated immediately when
+	 * the socket is down.
+	 *
 	 * @param hostname
-	 *            The host name which a server socket bind.
+	 * 		The host name which a server socket binds
 	 * @param port
-	 *            The port number which a server socket bind. A port number of 0
-	 *            means that the port number is automatically allocated.
-	 * @return A DataStream, containing the strings received from socket.
+	 * 		The port number which a server socket binds. A port number of 0 means that the port number is automatically
+	 * 		allocated.
+	 * @return A data stream containing the strings received from the socket
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port) {
 		return socketTextStream(hostname, port, '\n');
 	}
 
 	/**
-	 * Creates a new DataStream that contains a sequence of numbers.
-	 * 
-	 * @param from
-	 *            The number to start at (inclusive).
-	 * @param to
-	 *            The number to stop at (inclusive)
-	 * @return A DataStrean, containing all number in the [from, to] interval.
+	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
 	 */
-	public DataStreamSource<Long> generateSequence(long from, long to) {
-		if (from > to) {
-			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
-		}
-		return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+	public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V>
+			mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+		return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " +
+				"Input source");
+	}
+
+	/**
+	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
+	 */
+	public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V>
+			mapredInputFormat, Class<K> key, Class<V> value, Job job) {
+		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink
+				.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+		return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop Input " +
+				"source");
+	}
+
+	/**
+	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
+	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
+	 * program is executed.
+	 * <p/>
+	 * Since all data streams need specific information about their types, this method needs to determine the type of
+	 * the data produced by the input format. It will attempt to determine the data type by reflection, unless the
+	 * input
+	 * format implements the {@link org.apache.flink.api.java.typeutils .ResultTypeQueryable} interface. In the latter
+	 * case, this method will invoke the {@link org.apache.flink.api.java.typeutils
+	 * .ResultTypeQueryable#getProducedType()} method to determine data type produced by the input format.
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data created by the input format
+	 */
+	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
+		return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
+	}
+
+	/**
+	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
+	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
+	 * program is executed.
+	 * <p/>
+	 * The data stream is typed to the given TypeInformation. This method is intended for input formats where the
+	 * return
+	 * type cannot be determined by reflection analysis, and that do not implement the {@link
+	 * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data created by the input format
+	 */
+	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
+		return createInput(inputFormat, typeInfo, "Custom File source");
 	}
 
-	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
-			TypeInformation<String> typeInfo) {
-		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
-		DataStreamSource<String> returnStream = addSource(function, "File Source");
+	// private helper for passing different names
+	private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
+			TypeInformation<OUT> typeInfo, String sourceName) {
+		FileSourceFunction<OUT> function = new FileSourceFunction<OUT>(inputFormat, typeInfo);
+		DataStreamSource<OUT> returnStream = addSource(function, sourceName).returns(typeInfo);
 		streamGraph.setInputFormat(returnStream.getId(), inputFormat);
 		return returnStream;
 	}
 
 	/**
-	 * Create a DataStream using a user defined source function for arbitrary
-	 * source functionality.</p> By default sources have a parallelism of 1. To
-	 * enable parallel execution, the user defined source should implement
-	 * {@link ParallelSourceFunction} or extend
-	 * {@link RichParallelSourceFunction}. In these cases the resulting source
-	 * will have the parallelism of the environment. To change this afterwards
-	 * call {@link DataStreamSource#setParallelism(int)}
-	 * 
-	 * 
+	 * Ads a data source with a custom type information thus opening a {@link org.apache.flink.streaming.api
+	 * .datastream.DataStream}. Only in very special cases does the user need to support type information. Otherwise
+	 * use
+	 * {@link #addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
+	 * <p/>
+	 * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
+	 * implement {@link org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or extend {@link
+	 * org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In these cases the resulting source
+	 * will have the parallelism of the environment. To change this afterwards call {@link
+	 * org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
+	 *
 	 * @param function
-	 *            the user defined function
+	 * 		the user defined function
 	 * @param <OUT>
-	 *            type of the returned stream
+	 * 		type of the returned stream
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
-		return addSource(function, "Custom source");
+		return addSource(function, "Custom Source");
 	}
 
 	/**
@@ -626,27 +1032,27 @@ public abstract class StreamExecutionEnvironment {
 	 * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
 	 *
 	 * @param function
-	 *            the user defined function
+	 * 		the user defined function
 	 * @param sourceName
-	 *            Name of the data source
+	 * 		Name of the data source
 	 * @param <OUT>
-	 *            type of the returned stream
+	 * 		type of the returned stream
 	 * @return the data stream constructed
 	 */
 	@SuppressWarnings("unchecked")
-	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
 
-		TypeInformation<OUT> outTypeInfo;
+		TypeInformation<OUT> typeInfo;
 
 		if (function instanceof ResultTypeQueryable) {
-			outTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
+			typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
 		} else {
 			try {
-				outTypeInfo = TypeExtractor.createTypeInfo(
+				typeInfo = TypeExtractor.createTypeInfo(
 						SourceFunction.class,
 						function.getClass(), 0, null, null);
 			} catch (InvalidTypesException e) {
-				outTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
+				typeInfo = (TypeInformation<OUT>) new MissingTypeInfo("Custom source", e);
 			}
 		}
 
@@ -655,8 +1061,8 @@ public abstract class StreamExecutionEnvironment {
 		ClosureCleaner.clean(function, true);
 		StreamOperator<OUT> sourceOperator = new StreamSource<OUT>(function);
 
-		return new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceOperator, isParallel,
-				sourceName);
+		return new DataStreamSource<OUT>(this, sourceName, typeInfo, sourceOperator,
+				isParallel, sourceName);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -668,9 +1074,9 @@ public abstract class StreamExecutionEnvironment {
 	 * program is currently executed. If the program is invoked standalone, this
 	 * method returns a local execution environment, as returned by
 	 * {@link #createLocalEnvironment()}.
-	 * 
+	 *
 	 * @return The execution environment of the context in which the program is
-	 *         executed.
+	 * executed.
 	 */
 	public static StreamExecutionEnvironment getExecutionEnvironment() {
 		if (currentEnvironment != null) {
@@ -700,7 +1106,7 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. The default parallelism of the local
 	 * environment is the number of hardware contexts (CPU cores / threads),
 	 * unless it was specified differently by {@link #setParallelism(int)}.
-	 * 
+	 *
 	 * @return A local execution environment.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment() {
@@ -712,9 +1118,9 @@ public abstract class StreamExecutionEnvironment {
 	 * will run the program in a multi-threaded fashion in the same JVM as the
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
-	 * 
+	 *
 	 * @param parallelism
-	 *            The parallelism for the local environment.
+	 * 		The parallelism for the local environment.
 	 * @return A local execution environment with the specified parallelism.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
@@ -724,24 +1130,25 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	// TODO:fix cluster default parallelism
+
 	/**
 	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
 	 * (parts of) the program to a cluster for execution. Note that all file
 	 * paths used in the program must be accessible from the cluster. The
 	 * execution will use no parallelism, unless the parallelism is set
 	 * explicitly via {@link #setParallelism}.
-	 * 
+	 *
 	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
+	 * 		The host name or address of the master (JobManager), where the
+	 * 		program should be executed.
 	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
+	 * 		The port of the master (JobManager), where the program should
+	 * 		be executed.
 	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
+	 * 		The JAR files with code that needs to be shipped to the
+	 * 		cluster. If the program uses user-defined functions,
+	 * 		user-defined input formats, or any libraries, those must be
+	 * 		provided in the JAR files.
 	 * @return A remote environment that executes the program on a cluster.
 	 */
 	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
@@ -755,20 +1162,20 @@ public abstract class StreamExecutionEnvironment {
 	 * (parts of) the program to a cluster for execution. Note that all file
 	 * paths used in the program must be accessible from the cluster. The
 	 * execution will use the specified parallelism.
-	 * 
+	 *
 	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
+	 * 		The host name or address of the master (JobManager), where the
+	 * 		program should be executed.
 	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
+	 * 		The port of the master (JobManager), where the program should
+	 * 		be executed.
 	 * @param parallelism
-	 *            The parallelism to use during the execution.
+	 * 		The parallelism to use during the execution.
 	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
+	 * 		The JAR files with code that needs to be shipped to the
+	 * 		cluster. If the program uses user-defined functions,
+	 * 		user-defined input formats, or any libraries, those must be
+	 * 		provided in the JAR files.
 	 * @return A remote environment that executes the program on a cluster.
 	 */
 	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
@@ -782,34 +1189,34 @@ public abstract class StreamExecutionEnvironment {
 	 * Triggers the program execution. The environment will execute all parts of
 	 * the program that have resulted in a "sink" operation. Sink operations are
 	 * for example printing results or forwarding them to a message queue.
-	 * <p>
+	 * <p/>
 	 * The program execution will be logged and displayed with a generated
 	 * default name.
-	 * 
+	 *
 	 * @return The result of the job execution, containing elapsed time and
-	 *         accumulators.
+	 * accumulators.
 	 * @throws Exception
-	 **/
+	 */
 	public abstract JobExecutionResult execute() throws Exception;
 
 	/**
 	 * Triggers the program execution. The environment will execute all parts of
 	 * the program that have resulted in a "sink" operation. Sink operations are
 	 * for example printing results or forwarding them to a message queue.
-	 * <p>
+	 * <p/>
 	 * The program execution will be logged and displayed with the provided name
-	 * 
+	 *
 	 * @param jobName
-	 *            Desired name of the job
+	 * 		Desired name of the job
 	 * @return The result of the job execution, containing elapsed time and
-	 *         accumulators.
+	 * accumulators.
 	 * @throws Exception
-	 **/
+	 */
 	public abstract JobExecutionResult execute(String jobName) throws Exception;
 
 	/**
-	 * Getter of the {@link StreamGraph} of the streaming job.
-	 * 
+	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+	 *
 	 * @return The streamgraph representing the transformations
 	 */
 	public StreamGraph getStreamGraph() {
@@ -821,7 +1228,7 @@ public abstract class StreamExecutionEnvironment {
 	 * returns it as a String using a JSON representation of the execution data
 	 * flow graph. Note that this needs to be called, before the plan is
 	 * executed.
-	 * 
+	 *
 	 * @return The execution plan of the program, as a JSON String.
 	 */
 	public String getExecutionPlan() {
@@ -832,4 +1239,17 @@ public abstract class StreamExecutionEnvironment {
 		currentEnvironment = eef.createExecutionEnvironment();
 	}
 
+	private static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
+		Preconditions.checkNotNull(viewedAs);
+
+		for (OUT elem : elements) {
+			Preconditions.checkNotNull(elem, "The collection must not contain null elements.");
+
+			if (!viewedAs.isAssignableFrom(elem.getClass())) {
+				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
+						viewedAs.getCanonicalName());
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index 1a177c6..f74d81c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -55,8 +55,6 @@ public class FileMonitoringFunction extends RichSourceFunction<Tuple3<String, Lo
 	private Map<String, Long> offsetOfFiles;
 	private Map<String, Long> modificationTimes;
 
-	private volatile boolean isRunning = false;
-
 	private Queue<Tuple3<String, Long, Long>> pendingFiles;
 
 	public FileMonitoringFunction(String path, long interval, WatchType watchType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index f1dcd2b..af00d4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -28,24 +25,23 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
-public class FileSourceFunction extends RichParallelSourceFunction<String> {
-	private static final long serialVersionUID = 1L;
-
-	private InputSplitProvider provider;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
-	private InputFormat<String, ?> inputFormat;
+public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
+	private static final long serialVersionUID = 1L;
 
-	private TypeInformation<String> typeInfo;
-	private transient TypeSerializer<String> serializer;
+	private TypeInformation<OUT> typeInfo;
+	private transient TypeSerializer<OUT> serializer;
 
-	private InputFormat<String, InputSplit> format;
+	private InputSplitProvider provider;
+	private InputFormat<OUT, InputSplit> format;
 
 	private Iterator<InputSplit> splitIterator;
+	private transient OUT nextElement;
 
-	private transient String nextElement;
-
-	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
-		this.inputFormat = format;
+	public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
+		this.format = (InputFormat<OUT, InputSplit>) format;
 		this.typeInfo = typeInfo;
 	}
 
@@ -54,10 +50,9 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 	public void open(Configuration parameters) throws Exception {
 		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
 		this.provider = context.getInputSplitProvider();
-		inputFormat.configure(context.getTaskStubParameters());
+		format.configure(context.getTaskStubParameters());
 		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
 
-		format = (InputFormat<String, InputSplit>) this.inputFormat;
 		splitIterator = getInputSplits();
 		if (splitIterator.hasNext()) {
 			format.open(splitIterator.next());
@@ -135,12 +130,12 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 	}
 
 	@Override
-	public String next() throws Exception {
+	public OUT next() throws Exception {
 		if (reachedEnd()) {
 			throw new RuntimeException("End of FileSource reached.");
 		}
 
-		String result = nextElement;
+		OUT result = nextElement;
 		nextElement = null;
 		return result;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
new file mode 100644
index 0000000..d46b1f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public class FromIteratorFunction<T> implements SourceFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	Iterator<T> iterator;
+
+	public FromIteratorFunction(Iterator<T> iterator) {
+		this.iterator = iterator;
+	}
+
+	@Override
+	public boolean reachedEnd() throws Exception {
+		return !iterator.hasNext();
+	}
+
+	@Override
+	public T next() throws Exception {
+		return iterator.next();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 009366c..aa0f164 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -304,7 +304,7 @@ public class StreamGraph extends StreamingPlan {
 		getStreamNode(vertexID).setOperator(operatorObject);
 	}
 
-	public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
+	public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
 		getStreamNode(vertexID).setInputFormat(inputFormat);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c3b67ed/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index adb07a8..ddc71cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -60,7 +60,7 @@ public class StreamNode implements Serializable {
 
 	private Class<? extends AbstractInvokable> jobVertexClass;
 
-	private InputFormat<String, ?> inputFormat;
+	private InputFormat<?, ?> inputFormat;
 
 	public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?> operator,
 			String operatorName, List<OutputSelector<?>> outputSelector,
@@ -194,11 +194,11 @@ public class StreamNode implements Serializable {
 		return jobVertexClass;
 	}
 
-	public InputFormat<String, ?> getInputFormat() {
+	public InputFormat<?, ?> getInputFormat() {
 		return inputFormat;
 	}
 
-	public void setInputFormat(InputFormat<String, ?> inputFormat) {
+	public void setInputFormat(InputFormat<?, ?> inputFormat) {
 		this.inputFormat = inputFormat;
 	}
 


[2/6] flink git commit: [streaming] [scala] [api-breaking] StreamExecutionEnvironment API update

Posted by mb...@apache.org.
[streaming] [scala] [api-breaking] StreamExecutionEnvironment API update

Closes #738


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/145b2ba7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/145b2ba7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/145b2ba7

Branch: refs/heads/master
Commit: 145b2ba7f8ba72ed56992e74711290d18d32caf2
Parents: 2315c7c
Author: mbalassi <mb...@apache.org>
Authored: Wed May 27 12:40:33 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200

----------------------------------------------------------------------
 .../flink/api/scala/ExecutionEnvironment.scala  |  12 +-
 .../environment/StreamExecutionEnvironment.java |  74 -----------
 .../api/StreamExecutionEnvironmentTest.java     |   5 +
 .../api/scala/StreamExecutionEnvironment.scala  | 130 ++++++++++++++++---
 4 files changed, 123 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index e01ff3b..28e8458 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -455,8 +455,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
-   * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable
-   * because the framework may move the elements into the cluster if needed.
+   * Creates a DataSet from the given non-empty [[Seq]].
    *
    * Note that this operation will result in a non-parallel data source, i.e. a data source with
    * a parallelism of one.
@@ -476,8 +475,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
-   * Creates a DataSet from the given [[Iterator]]. The iterator must be serializable because the
-   * framework might move into the cluster if needed.
+   * Creates a DataSet from the given [[Iterator]].
    *
    * Note that this operation will result in a non-parallel data source, i.e. a data source with
    * a parallelism of one.
@@ -496,8 +494,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
-   * Creates a new data set that contains the given elements. The elements must all be of the
-   * same type and must be serializable.
+   * Creates a new data set that contains the given elements.
    *
    * * Note that this operation will result in a non-parallel data source, i.e. a data source with
    * a parallelism of one.
@@ -511,8 +508,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a new data set that contains elements in the iterator. The iterator is splittable,
    * allowing the framework to create a parallel data source that returns the elements in the
-   * iterator. The iterator must be serializable because the execution environment may ship the
-   * elements into the cluster.
+   * iterator.
    */
   def fromParallelCollection[T: ClassTag : TypeInformation](
       iterator: SplittableIterator[T]): DataSet[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8dd15bb..651da63 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -29,11 +29,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.PrimitiveInputFormat;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.io.TextValueInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
@@ -65,11 +63,8 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -802,52 +797,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
-	 * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
-	 */
-	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
-			mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
-		DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
-
-		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
-
-		return result;
-	}
-
-	/**
-	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
-	 * org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
-	 */
-	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V>
-			mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
-		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
-	}
-
-	/**
-	 * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
-	 * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
-	 */
-	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
-			.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws
-			IOException {
-		DataStreamSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
-
-		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
-				.hadoop.fs.Path(inputPath));
-
-		return result;
-	}
-
-	/**
-	 * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
-	 */
-	public <K, V> DataStreamSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input
-			.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws
-			IOException {
-		return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
-	}
-
-	/**
 	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
 	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
 	 * initiated.
@@ -908,29 +857,6 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
-	 */
-	public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V>
-			mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
-
-		return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " +
-				"Input source");
-	}
-
-	/**
-	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
-	 */
-	public <K, V> DataStreamSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V>
-			mapredInputFormat, Class<K> key, Class<V> value, Job job) {
-		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink
-				.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
-
-		return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop Input " +
-				"source");
-	}
-
-	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
 	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
 	 * program is executed.

http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index cbbd409..c70c1df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -109,5 +109,10 @@ public class StreamExecutionEnvironmentTest {
 		public Object next() {
 			return null;
 		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/145b2ba7/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 5999625..865b484 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,18 +18,20 @@
 
 package org.apache.flink.streaming.api.scala
 
-import scala.reflect.ClassTag
 import com.esotericsoftware.kryo.Serializer
-import org.apache.commons.lang.Validate
-import org.joda.time.Instant
+import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.runtime.state.StateHandleProvider
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
+import org.apache.flink.types.StringValue
+import org.apache.flink.util.SplittableIterator
+
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
-import org.apache.flink.runtime.state.StateHandleProvider
 
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
@@ -217,6 +219,61 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.readTextFile(filePath)
 
   /**
+   * Creates a data stream that represents the Strings produced by reading the given file
+   * line wise. The character set with the given name will be used to read the files.
+   */
+  def readTextFile(filePath: String, charsetName: String): DataStream[String] =
+    javaEnv.readTextFile(filePath, charsetName)
+
+  /**
+   * Creates a data stream that represents the strings produced by reading the given file
+   * line wise. This method is similar to the standard text file reader, but it produces
+   * a data stream with mutable StringValue objects, rather than Java Strings.
+   * StringValues can be used to tune implementations to be less object and garbage
+   * collection heavy. The file will be read with the system's default character set.
+   */
+  def readTextFileWithValue(filePath: String): DataStream[StringValue] =
+      javaEnv.readTextFileWithValue(filePath)
+
+  /**
+   * Creates a data stream that represents the strings produced by reading the given file
+   * line wise. This method is similar to the standard text file reader, but it produces
+   * a data stream with mutable StringValue objects, rather than Java Strings.
+   * StringValues can be used to tune implementations to be less object and garbage
+   * collection heavy. The boolean flag indicates whether to skip lines that cannot
+   * be read with the given character set.
+   */
+  def readTextFileWithValue(filePath: String, charsetName : String, skipInvalidLines : Boolean):
+    DataStream[StringValue] =
+    javaEnv.readTextFileWithValue(filePath, charsetName, skipInvalidLines)
+
+  /**
+   * Reads the given file with the given input format. The file path should be passed
+   * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
+   */
+  def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
+    DataStream[T] =
+    javaEnv.readFile(inputFormat, filePath)
+
+  /**
+   * Creates a data stream that represents the primitive type produced by reading the given file
+   * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
+   * "hdfs://host:port/file/path").
+   */
+  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]):
+    DataStream[T] =
+    javaEnv.readFileOfPrimitives(filePath, typeClass)
+
+  /**
+   * Creates a data stream that represents the primitive type produced by reading the given file
+   * line wise. The file path should be passed as a URI (e.g., "file:///some/local/file" or
+   * "hdfs://host:port/file/path").
+   */
+  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String,
+    typeClass: Class[T]): DataStream[T] =
+    javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
+
+  /**
    * Creates a DataStream that contains the contents of file created while
    * system watches the given path. The file will be read with the system's
    * default character set. The user can check the monitoring interval in milliseconds,
@@ -231,24 +288,37 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Creates a new DataStream that contains the strings received infinitely
    * from socket. Received strings are decoded by the system's default
-   * character set.
-   *
+   * character set. The maximum retry interval is specified in seconds, in case
+   * of temporary service outage reconnection is initiated every second.
    */
-  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
-    javaEnv.socketTextStream(hostname, port, delimiter)
+  def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
+    DataStream[String] =
+    javaEnv.socketTextStream(hostname, port)
 
   /**
-   * Creates a new DataStream that contains the strings received infinitely
-   * from socket. Received strings are decoded by the system's default
-   * character set, uses '\n' as delimiter.
-   *
+   * Generic method to create an input data stream with a specific input format.
+   * Since all data streams need specific information about their types, this method needs to
+   * determine the type of the data produced by the input format. It will attempt to determine the
+   * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
    */
-  def socketTextStream(hostname: String, port: Int): DataStream[String] =
-    javaEnv.socketTextStream(hostname, port)
+  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
+    javaEnv.createInput(inputFormat)
+
+  /**
+   * Generic method to create an input data stream with a specific input format.
+   * Since all data streams need specific information about their types, this method needs to
+   * determine the type of the data produced by the input format. It will attempt to determine the
+   * data type by reflection, unless the input format implements the ResultTypeQueryable interface.
+   */
+  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _],
+    typeInfo: TypeInformation[T]): DataStream[T] =
+    javaEnv.createInput(inputFormat, typeInfo)
 
   /**
    * Creates a new DataStream that contains a sequence of numbers.
    *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a parallelism of one.
    */
   def generateSequence(from: Long, to: Long): DataStream[Long] = {
     new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
@@ -256,10 +326,18 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a new DataStream that contains a sequence of numbers in a parallel fashion.
+   */
+  def generateParallelSequence(from: Long, to: Long): DataStream[Long] = {
+    new DataStream[java.lang.Long](javaEnv.generateParallelSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
+  }
+
+  /**
    * Creates a DataStream that contains the given elements. The elements must all be of the
-   * same type and must be serializable.
+   * same type.
    *
-   * * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
    * a parallelism of one.
    */
   def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
@@ -286,6 +364,26 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a DataStream from the given [[Iterator]].
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a parallelism of one.
+   */
+  def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    javaEnv.fromCollection(data.asJava, typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given [[SplittableIterator]].
+   */
+  def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
+    DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    javaEnv.fromParallelCollection(data, typeInfo)
+  }
+
+  /**
    * Create a DataStream using a user defined source function for arbitrary
    * source functionality. By default sources have a parallelism of 1. 
    * To enable parallel execution, the user defined source should implement 


[4/6] flink git commit: [FLINK-1687] [streaming] [api-breaking] fromCollection and generateSequence rework

Posted by mb...@apache.org.
[FLINK-1687] [streaming] [api-breaking] fromCollection and generateSequence rework


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/120bd0f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/120bd0f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/120bd0f4

Branch: refs/heads/master
Commit: 120bd0f428fd4ca4051d052ee9a79728d55e72d7
Parents: 1c3b67e
Author: mbalassi <mb...@apache.org>
Authored: Sat May 23 22:34:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:32 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  28 +++--
 .../functions/source/FromIteratorFunction.java  |   3 -
 .../source/FromSplittableIteratorFunction.java  |  52 +++++++++
 .../functions/source/GenSequenceFunction.java   |  55 ---------
 .../api/StreamExecutionEnvironmentTest.java     | 113 +++++++++++++++++++
 .../api/collector/DirectedOutputTest.java       |   2 +-
 .../api/complex/ComplexIntegrationTest.java     |   6 +-
 .../streaming/api/operators/ProjectTest.java    |   2 +-
 .../api/streamtask/StreamVertexTest.java        |   2 +-
 9 files changed, 188 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index a1fe60d..09a8788 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
-import org.apache.flink.streaming.api.functions.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -418,7 +418,7 @@ public abstract class StreamExecutionEnvironment {
 		if (from > to) {
 			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
 		}
-		return addSource(new GenSequenceFunction(from, to), "Sequence Source");
+		return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
 	}
 
 	/**
@@ -432,8 +432,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A data stream, containing all number in the [from, to] interval
 	 */
 	public DataStreamSource<Long> generateParallelSequence(long from, long to) {
-		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parellel " +
-				"Sequence source");
+		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Parallel " +
+				"Sequence Source");
 	}
 
 	/**
@@ -456,7 +456,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given array of elements
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+	public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
 		if (data.length == 0) {
 			throw new IllegalArgumentException(
 					"fromElements needs at least one element as argument");
@@ -489,7 +489,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given collection
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
 		if (data.isEmpty()) {
 			throw new IllegalArgumentException("Collection must not be empty");
@@ -518,7 +518,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given collection
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
+	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
 			typeInfo) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
 		if (data.isEmpty()) {
@@ -552,7 +552,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The data stream representing the elements in the iterator
 	 * @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
 		return fromCollection(data, TypeExtractor.getForClass(type));
 	}
 
@@ -578,7 +578,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the elements in the iterator
 	 */
-	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
+	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
 			typeInfo) {
 		Preconditions.checkNotNull(data, "The iterator must not be null");
 		if (!(data instanceof Serializable)) {
@@ -589,6 +589,12 @@ public abstract class StreamExecutionEnvironment {
 		return addSource(function, "Collection Source").returns(typeInfo);
 	}
 
+	// private helper for passing different names
+	private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator, TypeInformation<OUT>
+			typeInfo, String operatorName) {
+		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+	}
+
 	/**
 	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
@@ -636,13 +642,13 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
 			typeInfo) {
-		return fromParallelCollection(iterator, typeInfo, "Parallel Collection source");
+		return fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
 	}
 
 	// private helper for passing different names
 	private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT>
 			typeInfo, String operatorName) {
-		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+		return addSource(new FromSplittableIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index d46b1f2..125b88b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
 import java.util.Iterator;
 
 public class FromIteratorFunction<T> implements SourceFunction<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
new file mode 100644
index 0000000..fd86858
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.SplittableIterator;
+
+import java.util.Iterator;
+
+public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	SplittableIterator<T> fullIterator;
+	Iterator<T> iterator;
+
+	public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
+		this.fullIterator = iterator;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+		int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
+		iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
+	}
+
+	@Override
+	public boolean reachedEnd() throws Exception {
+		return !iterator.hasNext();
+	}
+
+	@Override
+	public T next() throws Exception {
+		return iterator.next();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
deleted file mode 100644
index 7d302d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.NumberSequenceIterator;
-
-/**
- * Source Function used to generate the number sequence
- * 
- */
-public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
-
-	private static final long serialVersionUID = 1L;
-
-	private NumberSequenceIterator fullIterator;
-	private NumberSequenceIterator splitIterator;
-
-	public GenSequenceFunction(long from, long to) {
-		fullIterator = new NumberSequenceIterator(from, to);
-	}
-
-	@Override
-	public void open(Configuration config) {
-		int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
-		int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
-	}
-
-	@Override
-	public boolean reachedEnd() throws Exception {
-		return !splitIterator.hasNext();
-	}
-
-	@Override
-	public Long next() throws Exception {
-		return splitIterator.next();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..cbbd409
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertTrue;
+
+public class StreamExecutionEnvironmentTest {
+
+	private static final long MEMORYSIZE = 32;
+	private static int PARALLELISM = 4;
+
+	@Test
+	public void testFromCollectionParallelism() {
+		TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		boolean seenExpectedException = false;
+
+		try {
+			DataStream<Object> dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo)
+					.setParallelism(4);
+		} catch (IllegalArgumentException e) {
+			seenExpectedException = true;
+		}
+
+		DataStream<Object> dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo)
+				.setParallelism(4);
+
+		String plan = env.getExecutionPlan();
+
+		assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+		assertTrue("Parallelism for dataStream1 is not right.",
+				plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
+		assertTrue("Parallelism for dataStream2 is not right.",
+				plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+	}
+
+	@Test
+	public void testGenerateSequenceParallelism() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+		boolean seenExpectedException = false;
+
+		try {
+			DataStream<Long> dataStream1 = env.generateSequence(0,0).setParallelism(4);
+		} catch (IllegalArgumentException e) {
+			seenExpectedException = true;
+		}
+
+		DataStream<Long> dataStream2 = env.generateParallelSequence(0,0).setParallelism(4);
+
+		String plan = env.getExecutionPlan();
+
+		assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
+		assertTrue("Parallelism for dataStream1 is not right.",
+				plan.contains("\"contents\":\"Sequence Source\",\"parallelism\":1"));
+		assertTrue("Parallelism for dataStream2 is not right.",
+				plan.contains("\"contents\":\"Parallel Sequence Source\",\"parallelism\":4"));
+	}
+
+	public static class DummySplittableIterator extends SplittableIterator {
+		private static final long serialVersionUID = 1312752876092210499L;
+
+		@Override
+		public Iterator[] split(int numPartitions) {
+			return new Iterator[0];
+		}
+
+		@Override
+		public int getMaximumNumberOfSplits() {
+			return 0;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return false;
+		}
+
+		@Override
+		public Object next() {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fc3e36f..56b6ae8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -101,7 +101,7 @@ public class DirectedOutputTest {
 		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
 		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
 
-		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
+		SplitDataStream<Long> source = env.generateParallelSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(evenSink);
 		source.select(ODD, TEN).addSink(oddAndTenSink);
 		source.select(EVEN, ODD).addSink(evenAndOddSink);

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 908671d..d321851 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -222,8 +222,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		
 		env.setBufferTimeout(0);
 
-		DataStream<Long> sourceStream31 = env.generateSequence(1, 10000);
-		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
+		DataStream<Long> sourceStream31 = env.generateParallelSequence(1, 10000);
+		DataStream<Long> sourceStream32 = env.generateParallelSequence(10001, 20000);
 
 		sourceStream31.filter(new PrimeFilterFunction())
 				.window(Count.of(100))
@@ -308,7 +308,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		
 		env.setBufferTimeout(0);
 
-		DataStream<Long> dataStream51 = env.generateSequence(1, 5)
+		DataStream<Long> dataStream51 = env.generateParallelSequence(1, 5)
 				.map(new MapFunction<Long, Long>() {
 
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index d9cc607..c09afee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -92,7 +92,7 @@ public class ProjectTest implements Serializable {
 
 		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
 
-		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+		env.generateParallelSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
 				@Override
 				public Tuple3<Long, Character, Double> map(Long value) throws Exception {
 					return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/120bd0f4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index 0bb1848..90cb7b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -157,7 +157,7 @@ public class StreamVertexTest {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
 
 		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
-		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+		DataStream<Long> generatedSequence = env.generateParallelSequence(0, 3);
 
 		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
 


[6/6] flink git commit: [streaming] Build warnings eliminated from streaming-core

Posted by mb...@apache.org.
[streaming] Build warnings eliminated from streaming-core


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0930179f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0930179f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0930179f

Branch: refs/heads/master
Commit: 0930179f41dec179cb60882f699ee4ce8ba34d61
Parents: 0fd6023
Author: mbalassi <mb...@apache.org>
Authored: Thu May 28 16:32:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri May 29 15:08:33 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     | 15 ++------
 .../streaming/api/datastream/DataStream.java    | 37 ++++++++++----------
 .../api/datastream/DiscretizedStream.java       |  7 ++--
 .../api/datastream/GroupedDataStream.java       | 11 ++----
 .../api/datastream/IterativeDataStream.java     |  6 ++--
 .../api/datastream/WindowedDataStream.java      |  4 +--
 .../datastream/temporal/TemporalOperator.java   |  4 +--
 .../environment/StreamExecutionEnvironment.java | 35 ++++++++----------
 .../functions/source/FileSourceFunction.java    |  1 +
 .../api/operators/windowing/WindowMerger.java   |  1 +
 .../ExtractionAwareDeltaFunction.java           |  6 ++--
 .../streaming/api/windowing/helper/Delta.java   |  1 +
 .../streaming/api/windowing/helper/Time.java    |  2 +-
 .../policy/CloneableMultiEvictionPolicy.java    |  4 +--
 .../windowing/policy/CountEvictionPolicy.java   |  2 +-
 .../api/windowing/policy/DeltaPolicy.java       |  1 +
 .../api/windowing/policy/TimeTriggerPolicy.java | 14 +++-----
 .../streaming/runtime/tasks/OutputHandler.java  |  2 +-
 .../api/StreamExecutionEnvironmentTest.java     |  1 +
 .../SlidingTimeGroupedPreReducerTest.java       |  1 +
 .../windowbuffer/SlidingTimePreReducerTest.java |  1 +
 21 files changed, 69 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 7362802..02db538 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.Utils;
@@ -28,8 +27,6 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
 import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoReduceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
@@ -232,9 +229,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * the output to a common type. The transformation calls a
 	 * {@link CoMapFunction#map1} for each element of the first input and
 	 * {@link CoMapFunction#map2} for each element of the second input. Each
-	 * CoMapFunction call returns exactly one element. The user can also extend
-	 * {@link RichCoMapFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
+	 * CoMapFunction call returns exactly one element.
 	 * 
 	 * @param coMapper
 	 *            The CoMapFunction used to jointly transform the two input
@@ -258,9 +253,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input
 	 * and {@link CoFlatMapFunction#flatMap2} for each element of the second
 	 * input. Each CoFlatMapFunction call returns any number of elements
-	 * including none. The user can also extend {@link RichFlatMapFunction} to
-	 * gain access to other features provided by the {@link RichFuntion}
-	 * interface.
+	 * including none.
 	 * 
 	 * @param coFlatMapper
 	 *            The CoFlatMapFunction used to jointly transform the two input
@@ -285,9 +278,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * sliding batch/window of the data stream. If the connected data stream is
 	 * grouped then the reducer is applied on every group of elements sharing
 	 * the same key. This type of reduce is much faster than reduceGroup since
-	 * the reduce function can be applied incrementally. The user can also
-	 * extend the {@link RichCoReduceFunction} to gain access to other features
-	 * provided by the {@link RichFuntion} interface.
+	 * the reduce function can be applied incrementally.
 	 * 
 	 * @param coReducer
 	 *            The {@link CoReduceFunction} that will be called for every

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5165ec7..7edbec5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -378,7 +378,7 @@ public class DataStream<OUT> {
 	 * instances of the next processing operator.
 	 * 
 	 * @param keySelector
-	 * @return
+	 * @return The partitioned DataStream
 	 */
 	protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
@@ -607,17 +607,13 @@ public class DataStream<OUT> {
 	 * Initiates a Project transformation on a {@link Tuple} {@link DataStream}.<br/>
 	 * <b>Note: Only Tuple DataStreams can be projected.</b></br> The
 	 * transformation projects each Tuple of the DataSet onto a (sub)set of
-	 * fields.</br> This method returns a {@link StreamProjection} on which
-	 * {@link StreamProjection#types(Class)} needs to be called to completed the
-	 * transformation.
+	 * fields.
 	 * 
 	 * @param fieldIndexes
 	 *            The field indexes of the input tuples that are retained. The
 	 *            order of fields in the output tuple corresponds to the order
 	 *            of field indexes.
-	 * @return A StreamProjection that needs to be converted into a DataStream
-	 *         to complete the project transformation by calling
-	 *         {@link StreamProjection#types(Class)}.
+	 * @return The projected DataStream
 	 * 
 	 * @see Tuple
 	 * @see DataStream
@@ -639,8 +635,8 @@ public class DataStream<OUT> {
 	 * {@link StreamCrossOperator#onWindow} should be called to define the
 	 * window.
 	 * <p>
-	 * Call {@link StreamCrossOperator.CrossWindow#with(crossFunction)} to
-	 * define a custom cross function.
+	 * Call {@link StreamCrossOperator.CrossWindow#with(org.apache.flink.api.common.functions.CrossFunction)}
+	 * to define a custom cross function.
 	 * 
 	 * @param dataStreamToCross
 	 *            The other DataStream with which this DataStream is crossed.
@@ -658,14 +654,17 @@ public class DataStream<OUT> {
 	 * {@link DataStream}s on key equality over a specified time window.</br>
 	 * 
 	 * This method returns a {@link StreamJoinOperator} on which the
-	 * {@link StreamJoinOperator#onWindow} should be called to define the
-	 * window, and then the {@link StreamJoinOperator.JoinWindow#where} and
-	 * {@link StreamJoinOperator.JoinPredicate#equalTo} can be used to define
-	 * the join keys.</p> The user can also use the
-	 * {@link StreamJoinOperator.JoinedStream#with(joinFunction)} to apply
-	 * custom join function.
-	 * 
-	 * @param other
+	 * {@link StreamJoinOperator#onWindow(long, java.util.concurrent.TimeUnit)}
+	 * should be called to define the window, and then the
+	 * {@link StreamJoinOperator.JoinWindow#where(int...)} and
+	 * {@link StreamJoinOperator.JoinPredicate#equalTo(int...)} can be used to define
+	 * the join keys.
+	 * <p>
+	 * The user can also use the
+	 * {@link StreamJoinOperator.JoinedStream#with(org.apache.flink.api.common.functions.JoinFunction)}
+	 * to apply a custom join function.
+	 * 
+	 * @param dataStreamToJoin
 	 *            The other DataStream with which this DataStream is joined.
 	 * @return A {@link StreamJoinOperator} to continue the definition of the
 	 *         Join transformation.
@@ -923,11 +922,11 @@ public class DataStream<OUT> {
 	 * to a grouped data stream, the windows (evictions) and slide sizes
 	 * (triggers) will be computed on a per group basis. </br></br> For more
 	 * advanced control over the trigger and eviction policies please refer to
-	 * {@link #window(trigger, eviction)} </br> </br> For example to create a
+	 * {@link #window(TriggerPolicy, EvictionPolicy)} </br> </br> For example to create a
 	 * sum every 5 seconds in a tumbling fashion:</br>
 	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
 	 * create sliding windows use the
-	 * {@link WindowedDataStream#every(WindowingHelper...)} </br></br> The same
+	 * {@link WindowedDataStream#every(WindowingHelper)} </br></br> The same
 	 * example with 3 second slides:</br>
 	 * 
 	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3,

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index d70c4b8..4083bb8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -53,9 +53,8 @@ import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation
 /**
  * A {@link DiscretizedStream} represents a data stream that has been divided
  * into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()},
- * {@link #foldWindow(FoldFunction, initialValue)} or aggregations can be
- * applied to the windows.
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)},
+ * or aggregations can be applied to the windows.
  * 
  * @param <OUT>
  *            The output type of the {@link DiscretizedStream}
@@ -127,7 +126,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	 * 
 	 * @param reduceFunction
 	 *            The reduce function to be applied on the windows
-	 * @return
+	 * @return The reduced DataStream
 	 */
 	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index aed02dc..62e7781 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -32,7 +31,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 /**
  * A GroupedDataStream represents a {@link DataStream} which has been
  * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
- * {@link #batchReduce} etc. can be applied on the {@link GroupedDataStream} to
+ * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
  * get additional functionality by the grouping.
  * 
  * @param <OUT>
@@ -67,9 +66,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * Applies a reduce transformation on the grouped data stream grouped on by
 	 * the given key position. The {@link ReduceFunction} will receive input
 	 * values based on the key value. Only input values with the same key will
-	 * go to the same reducer.The user can also extend
-	 * {@link RichReduceFunction} to gain access to other features provided by
-	 * the {@link RichFuntion} interface.
+	 * go to the same reducer.
 	 * 
 	 * @param reducer
 	 *            The {@link ReduceFunction} that will be called for every
@@ -86,9 +83,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 * Applies a fold transformation on the grouped data stream grouped on by
 	 * the given key position. The {@link FoldFunction} will receive input
 	 * values based on the key value. Only input values with the same key will
-	 * go to the same folder.The user can also extend {@link RichFoldFunction}
-	 * to gain access to other features provided by the {@link RichFuntion}
-	 * interface.
+	 * go to the same folder.
 	 * 
 	 * @param folder
 	 *            The {@link FoldFunction} that will be called for every element

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 178f2eb..581087d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -42,11 +42,11 @@ public class IterativeDataStream<IN> extends
 	 * program part that will be fed back to the start of the iteration. </br>
 	 * </br>A common usage pattern for streaming iterations is to use output
 	 * splitting to send a part of the closing data stream to the head. Refer to
-	 * {@link SingleOutputStreamOperator#split(OutputSelector)} for more
-	 * information.
+	 * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+	 * for more information.
 	 * 
 	 * 
-	 * @param iterationResult
+	 * @param iterationTail
 	 *            The data stream that is fed back to the next iteration head.
 	 * @return Returns the stream that was fed back to the iteration. In most
 	 *         cases no further transformation are applied on this stream.

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index a10c79e..5d769ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -76,7 +76,7 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 /**
  * A {@link WindowedDataStream} represents a data stream that has been
  * discretised into windows. User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)} or aggregations
  * can be applied to the windows. The results of these transformations are also
  * WindowedDataStreams of the same discretisation unit.
  * 
@@ -610,7 +610,7 @@ public class WindowedDataStream<OUT> {
 	 * stream's underlying type. A dot can be used to drill down into objects,
 	 * as in {@code "field1.getInnerField2()" }.
 	 * 
-	 * @param positionToSum
+	 * @param field
 	 *            The field to sum
 	 * @return The transformed DataStream.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
index 75332d1..3fe7eb7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
@@ -69,7 +69,7 @@ public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
 	 * transformed.To define sliding windows call {@link TemporalWindow#every}
 	 * on the resulting operator.
 	 * 
-	 * @param windowSize
+	 * @param length
 	 *            The size of the window in milliseconds.
 	 * @param timeStamp1
 	 *            The timestamp used to extract time from the elements of the
@@ -89,7 +89,7 @@ public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
 	 * transformed.To define sliding windows call {@link TemporalWindow#every}
 	 * on the resulting operator.
 	 * 
-	 * @param windowSize
+	 * @param length
 	 *            The size of the window in milliseconds.
 	 * @param timeStamp1
 	 *            The timestamp used to extract time from the elements of the

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 651da63..da1ba73 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -251,7 +251,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Sets the {@link StateHandleProvider} used for storing operator state
 	 * checkpoints when checkpointing is enabled.
 	 * <p>
-	 * An example would be using a {@link FileStateHandle#createProvider(Path)}
+	 * An example would be using a {@link FileStateHandle#createProvider(String)}
 	 * to use any Flink supported file system as a state backend
 	 * 
 	 */
@@ -857,16 +857,14 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
-	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
-	 * program is executed.
+	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
 	 * <p/>
 	 * Since all data streams need specific information about their types, this method needs to determine the type of
 	 * the data produced by the input format. It will attempt to determine the data type by reflection, unless the
 	 * input
-	 * format implements the {@link org.apache.flink.api.java.typeutils .ResultTypeQueryable} interface. In the latter
-	 * case, this method will invoke the {@link org.apache.flink.api.java.typeutils
-	 * .ResultTypeQueryable#getProducedType()} method to determine data type produced by the input format.
+	 * format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. In the latter
+	 * case, this method will invoke the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()}
+	 * method to determine data type produced by the input format.
 	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
@@ -879,14 +877,12 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
-	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
-	 * program is executed.
-	 * <p/>
+	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
+	 * <p>
 	 * The data stream is typed to the given TypeInformation. This method is intended for input formats where the
 	 * return
-	 * type cannot be determined by reflection analysis, and that do not implement the {@link
-	 * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * type cannot be determined by reflection analysis, and that do not implement the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
@@ -908,14 +904,13 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Ads a data source with a custom type information thus opening a {@link org.apache.flink.streaming.api
-	 * .datastream.DataStream}. Only in very special cases does the user need to support type information. Otherwise
-	 * use
-	 * {@link #addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
-	 * <p/>
+	 * Adds a data source with a custom type information thus opening a
+	 * {@link org.apache.flink.streaming.api.datastream.DataStream}. Only in very special cases does the user need
+	 * to support type information. Otherwise use {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
+	 * <p>
 	 * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
-	 * implement {@link org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or extend {@link
-	 * org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In these cases the resulting source
+	 * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link
+	 * org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}. In these cases the resulting source
 	 * will have the parallelism of the environment. To change this afterwards call {@link
 	 * org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index af00d4d..c6f4421 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -40,6 +40,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 	private Iterator<InputSplit> splitIterator;
 	private transient OUT nextElement;
 
+	@SuppressWarnings("unchecked")
 	public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
 		this.format = (InputFormat<OUT, InputSplit>) format;
 		this.typeInfo = typeInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index 9b33474..bb8cfaa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -43,6 +43,7 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
 	}
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public void processElement(StreamWindow<T> nextWindow) throws Exception {
 
 		StreamWindow<T> current = windows.get(nextWindow.windowID);

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
index 81d01a4..3e9f2ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
@@ -47,10 +47,10 @@ public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFun
 
 	/**
 	 * This method takes the two data point and runs the set extractor on it.
-	 * The delta function implemented at {@link getNestedDelta} is then called
+	 * The delta function implemented at {@link #getNestedDelta} is then called
 	 * with the extracted data. In case no extractor is set the input data gets
-	 * passes to {@link getNestedDelta} as-is. The return value is just
-	 * forwarded from {@link getNestedDelta}.
+	 * passes to {@link #getNestedDelta} as-is. The return value is just
+	 * forwarded from {@link #getNestedDelta}.
 	 * 
 	 * @param oldDataPoint
 	 *            the older data point as raw data (before extraction).

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index bcb548f..255049d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -94,6 +94,7 @@ public class Delta<DATA> extends WindowingHelper<DATA> {
 		return new Delta<DATA>(deltaFunction, initVal, threshold);
 	}
 
+	@SuppressWarnings("unchecked")
 	private void instantiateTypeSerializer(){
 		if (executionConfig == null){
 			throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer.");

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 0089d26..022f975 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 /**
  * This helper represents a time based count or eviction policy. By default the
  * time is measured with {@link System#currentTimeMillis()} in
- * {@link DefaultTimeStamp}.
+ * {@link SystemTimestamp}.
  * 
  * @param <DATA>
  *            The data type which is handled by the time stamp used in the

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
index d86b174..5adddc4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
@@ -44,7 +44,7 @@ public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA
 	 * 
 	 * When using this constructor the MAX strategy is used by default. You can
 	 * select other strategies using
-	 * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(org.apache.flink.streaming.api.windowing.policy.MultiEvictionPolicy.EvictionStrategy, CloneableEvictionPolicy...)}
+	 * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(EvictionStrategy, CloneableEvictionPolicy...)}
 	 * .
 	 * 
 	 * @param evictionPolicies
@@ -60,7 +60,7 @@ public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA
 	 * constructor.
 	 * 
 	 * @param strategy
-	 *            the strategy to be used. See {@link EvictionStrategy} for a
+	 *            the strategy to be used. See {@link MultiEvictionPolicy.EvictionStrategy} for a
 	 *            list of possible options.
 	 * @param evictionPolicies
 	 *            some cloneable policies to be tied together.

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 5e0c862..9be25d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -89,7 +89,7 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
 	 *            will be adjusted respectively but never below zero.
 	 * @param startValue
 	 *            A custom start value for the counter of arriving elements.
-	 * @see CountEvictionPolicy#NextGenCountEvictionPolicy(int, int)
+	 * @see CountEvictionPolicy#CountEvictionPolicy(int, int)
 	 */
 	public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
 		this.counter = startValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 69dd66f..0583176 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -86,6 +86,7 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
 	 * 				TypeSerializer to properly forward the initial value to
 	 * 				the cluster
 	 */
+	@SuppressWarnings("unchecked")
 	public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) {
 		this.deltaFuntion = deltaFuntion;
 		this.triggerDataPoint = init;

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index ef701cf..03984a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
@@ -46,7 +47,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	/**
 	 * This is mostly the same as
-	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In addition
+	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimestampWrapper)}. In addition
 	 * to granularity and timestamp a delay can be specified for the first
 	 * trigger. If the start time given by the timestamp is x, the delay is y,
 	 * and the granularity is z, the first trigger will happen at x+y+z.
@@ -57,11 +58,6 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 * @param timestampWrapper
 	 *            The {@link TimestampWrapper} to measure the time with. This
 	 *            can be either user defined of provided by the API.
-	 * @param timeWrapper
-	 *            This policy creates fake elements to not miss windows in case
-	 *            no element arrived within the duration of the window. This
-	 *            extractor should wrap a long into such an element of type
-	 *            DATA.
 	 */
 	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
 		this.startTime = timestampWrapper.getStartTime();
@@ -86,9 +82,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	}
 
 	/**
-	 * In case {@link DefaultTimeStamp} is used, a runnable is returned which
+	 * In case {@link SystemTimestamp} is used, a runnable is returned which
 	 * triggers based on the current system time. If any other time measure is
-	 * used the method return null.
+	 * used the method returns null.
 	 * 
 	 * @param callback
 	 *            The object which is takes the callbacks for adding fake
@@ -107,7 +103,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	/**
 	 * This method is only called in case the runnable triggers a window end
-	 * according to the {@link DefaultTimeStamp}.
+	 * according to the {@link SystemTimestamp}.
 	 * 
 	 * @param callback
 	 *            The callback object.

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 38f1231..20aeb89 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -226,8 +226,8 @@ public class OutputHandler<OUT> {
 		}
 
 		@Override
+		@SuppressWarnings("unchecked")
 		public void collect(T record) {
-
 			try {
 				operator.processElement(record);
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index c70c1df..5ca4883 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -41,6 +41,7 @@ public class StreamExecutionEnvironmentTest {
 	private static int PARALLELISM = 4;
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testFromCollectionParallelism() {
 		TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
 		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 18a4748..3f1cba1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -52,6 +52,7 @@ public class SlidingTimeGroupedPreReducerTest {
 	KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testPreReduce1() throws Exception {
 		// This ensures that the buffer is properly cleared after a burst of elements by
 		// replaying the same sequence of elements with a later timestamp and expecting the same

http://git-wip-us.apache.org/repos/asf/flink/blob/0930179f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index a48bc0c..0519da7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -44,6 +44,7 @@ public class SlidingTimePreReducerTest {
 	ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testPreReduce1() throws Exception {
 		// This ensures that the buffer is properly cleared after a burst of elements by
 		// replaying the same sequence of elements with a later timestamp and expecting the same