You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:57 UTC

[16/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
deleted file mode 100644
index f9aa79d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.util.Collector;
-
-public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private KeySelector<IN1, ?> keySelector1;
-	private KeySelector<IN2, ?> keySelector2;
-	private JoinFunction<IN1, IN2, OUT> joinFunction;
-
-	public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
-			JoinFunction<IN1, IN2, OUT> joinFunction) {
-		this.keySelector1 = keySelector1;
-		this.keySelector2 = keySelector2;
-		this.joinFunction = joinFunction;
-	}
-
-	@Override
-	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
-
-		Map<Object, List<IN1>> map = build(first);
-
-		for (IN2 record : second) {
-			Object key = keySelector2.getKey(record);
-			List<IN1> match = map.get(key);
-			if (match != null) {
-				for (IN1 matching : match) {
-					out.collect(joinFunction.join(matching, record));
-				}
-			}
-		}
-
-	}
-
-	private Map<Object, List<IN1>> build(List<IN1> records) throws Exception {
-
-		Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
-
-		for (IN1 record : records) {
-			Object key = keySelector1.getKey(record);
-			List<IN1> current = map.get(key);
-			if (current == null) {
-				current = new LinkedList<IN1>();
-				map.put(key, current);
-			}
-			current.add(record);
-		}
-
-		return map;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
deleted file mode 100644
index 2458f1b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoFlatMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
deleted file mode 100644
index 20d520c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoMapFunction represents a Map transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
deleted file mode 100644
index 655923f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoReduceFunction represents a Reduce transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoReduceFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
deleted file mode 100644
index 2709203..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.co;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-
-public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
-		CoWindowFunction<IN1, IN2, O> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
-			throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
deleted file mode 100644
index 24beba1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple implementation of the SinkFunction writing tuples in the specified
- * OutputFormat format. Tuples are collected to a list and written to the file
- * periodically. The target path and the overwrite mode are pre-packaged in
- * format.
- * 
- * @param <IN>
- *            Input type
- */
-public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected volatile OutputFormat<IN> format;
-	protected volatile boolean cleanupCalled = false;
-	protected int indexInSubtaskGroup;
-	protected int currentNumberOfSubtasks;
-
-	public FileSinkFunction(OutputFormat<IN> format) {
-		this.format = format;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		format.configure(context.getTaskStubParameters());
-		indexInSubtaskGroup = context.getIndexOfThisSubtask();
-		currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
-		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
-	}
-
-	@Override
-	public void invoke(IN record) throws Exception {
-		tupleList.add(record);
-		if (updateCondition()) {
-			flush();
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (!tupleList.isEmpty()) {
-			flush();
-		}
-		try {
-			format.close();
-		} catch (Exception ex) {
-			try {
-				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
-					cleanupCalled = true;
-					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
-				}
-			} catch (Throwable t) {
-				LOG.error("Cleanup on error failed.", t);
-			}
-		}
-	}
-
-	protected void flush() {
-		try {
-			for (IN rec : tupleList) {
-				format.writeRecord(rec);
-			}
-		} catch (Exception ex) {
-			try {
-				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
-					cleanupCalled = true;
-					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
-				}
-			} catch (Throwable t) {
-				LOG.error("Cleanup on error failed.", t);
-			}
-		}
-		resetParameters();
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
deleted file mode 100644
index f049a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import org.apache.flink.api.common.io.OutputFormat;
-
-/**
- * Implementation of FileSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- * 
- * @param <IN>
- *            Input type
- */
-public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
-		super(format);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
deleted file mode 100644
index 947f8ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.PrintStream;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-
-/**
- * Implementation of the SinkFunction writing every tuple to the standard
- * output or standard error stream.
- * 
- * @param <IN>
- *            Input record type
- */
-public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final boolean STD_OUT = false;
-	private static final boolean STD_ERR = true;
-	
-	private boolean target; 
-	private transient PrintStream stream;
-	private transient String prefix;
-	
-	/**
-	 * Instantiates a print sink function that prints to standard out.
-	 */
-	public PrintSinkFunction() {}
-	
-	/**
-	 * Instantiates a print sink function that prints to standard out.
-	 * 
-	 * @param stdErr True, if the format should print to standard error instead of standard out.
-	 */
-	public PrintSinkFunction(boolean stdErr) {
-		target = stdErr;
-	}
-
-	public void setTargetToStandardOut() {
-		target = STD_OUT;
-	}
-	
-	public void setTargetToStandardErr() {
-		target = STD_ERR;
-	}
-	
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		// get the target stream
-		stream = target == STD_OUT ? System.out : System.err;
-		
-		// set the prefix if we have a >1 parallelism
-		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
-				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
-	}
-
-	@Override
-	public void invoke(IN record) {
-		if (prefix != null) {
-			stream.println(prefix + record.toString());
-		}
-		else {
-			stream.println(record.toString());
-		}
-	}
-	
-	@Override
-	public void close() {
-		this.stream = null;
-		this.prefix = null;
-	}
-	
-	@Override
-	public String toString() {
-		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
deleted file mode 100644
index 3b8a4db..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract void invoke(IN value) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
deleted file mode 100644
index eed4234..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * Interface for implementing user defined sink functionality.
- *
- * @param <IN> Input type parameter.
- */
-public interface SinkFunction<IN> extends Function, Serializable {
-
-	/**
-	 * Function for standard sink behaviour. This function is called for every record.
-	 *
-	 * @param value The input record.
-	 * @throws Exception
-	 */
-	public void invoke(IN value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
deleted file mode 100644
index c582c1b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
-	private final String hostName;
-	private final int port;
-	private final SerializationSchema<IN, byte[]> schema;
-	private transient Socket client;
-	private transient DataOutputStream dataOutputStream;
-
-	/**
-	 * Default constructor.
-	 *
-	 * @param hostName Host of the Socket server.
-	 * @param port Port of the Socket.
-	 * @param schema Schema of the data.
-	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
-		this.hostName = hostName;
-		this.port = port;
-		this.schema = schema;
-	}
-
-	/**
-	 * Initializes the connection to Socket.
-	 */
-	public void intializeConnection() {
-		OutputStream outputStream;
-		try {
-			client = new Socket(hostName, port);
-			outputStream = client.getOutputStream();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-		dataOutputStream = new DataOutputStream(outputStream);
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Socket.
-	 *
-	 * @param value
-	 *			The incoming data
-	 */
-	@Override
-	public void invoke(IN value) {
-		byte[] msg = schema.serialize(value);
-		try {
-			dataOutputStream.write(msg);
-		} catch (IOException e) {
-			if(LOG.isErrorEnabled()){
-				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
-			}
-		}
-	}
-
-	/**
-	 * Closes the connection of the Socket client.
-	 */
-	private void closeConnection(){
-		try {
-			dataOutputStream.flush();
-			client.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing connection with socket server at "
-					+ hostName + ":" + port, e);
-		} finally {
-			if (client != null) {
-				try {
-					client.close();
-				} catch (IOException e) {
-					LOG.error("Cannot close connection with socket server at "
-							+ hostName + ":" + port, e);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Initialize the connection with the Socket in the server.
-	 * @param parameters Configuration.
-	 */
-	@Override
-	public void open(Configuration parameters) {
-		intializeConnection();
-	}
-
-	/**
-	 * Closes the connection with the Socket server.
-	 */
-	@Override
-	public void close() {
-		closeConnection();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
deleted file mode 100644
index a606302..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-/**
- * Abstract class for formatting the output of the writeAsText and writeAsCsv
- * functions.
- *
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteFormat<IN> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Writes the contents of tupleList to the file specified by path.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param tupleList
-	 *            is the list of tuples to be written
-	 */
-	protected abstract void write(String path, ArrayList<IN> tupleList);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
deleted file mode 100644
index b22fd80..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in csv format.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	protected void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				String strTuple = tupleToWrite.toString();
-				outStream.println(strTuple.substring(1, strTuple.length() - 1));
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
deleted file mode 100644
index 5891104..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in text format.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsText<IN> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				outStream.println(tupleToWrite);
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
deleted file mode 100644
index 0c52afc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Simple implementation of the SinkFunction writing tuples as simple text to
- * the file specified by path. Tuples are collected to a list and written to the
- * file periodically. The file specified by path is created if it does not
- * exist, cleared if it exists before the writing.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	protected final String path;
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected WriteFormat<IN> format;
-
-	public WriteSinkFunction(String path, WriteFormat<IN> format) {
-		this.path = path;
-		this.format = format;
-		cleanFile(path);
-	}
-
-	/**
-	 * Creates target file if it does not exist, cleans it if it exists.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 */
-	protected void cleanFile(String path) {
-		try {
-			PrintWriter writer;
-			writer = new PrintWriter(path);
-			writer.print("");
-			writer.close();
-		} catch (FileNotFoundException e) {
-			throw new RuntimeException("File not found " + path, e);
-		}
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-	/**
-	 * Implementation of the invoke method of the SinkFunction class. Collects
-	 * the incoming tuples in tupleList and appends the list to the end of the
-	 * target file if updateCondition() is true or the current tuple is the
-	 * endTuple.
-	 */
-	@Override
-	public void invoke(IN tuple) {
-
-		tupleList.add(tuple);
-		if (updateCondition()) {
-			format.write(path, tupleList);
-			resetParameters();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
deleted file mode 100644
index ee6df94..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
-		super(path, format);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
deleted file mode 100644
index 2a84c0e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
-
-	public enum WatchType {
-		ONLY_NEW_FILES, // Only new files will be processed.
-		REPROCESS_WITH_APPENDED, // When some files are appended, all contents
-									// of the files will be processed.
-		PROCESS_ONLY_APPENDED // When some files are appended, only appended
-								// contents will be processed.
-	}
-
-	private String path;
-	private long interval;
-	private WatchType watchType;
-
-	private FileSystem fileSystem;
-	private Map<String, Long> offsetOfFiles;
-	private Map<String, Long> modificationTimes;
-
-	private volatile boolean isRunning = false;
-
-	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
-		this.path = path;
-		this.interval = interval;
-		this.watchType = watchType;
-		this.modificationTimes = new HashMap<String, Long>();
-		this.offsetOfFiles = new HashMap<String, Long>();
-	}
-
-	@Override
-	public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
-		isRunning = true;
-		fileSystem = FileSystem.get(new URI(path));
-
-		while (isRunning) {
-			List<String> files = listNewFiles();
-			for (String filePath : files) {
-				if (watchType == WatchType.ONLY_NEW_FILES
-						|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
-					collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
-					offsetOfFiles.put(filePath, -1L);
-				} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
-					long offset = 0;
-					long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
-					if (offsetOfFiles.containsKey(filePath)) {
-						offset = offsetOfFiles.get(filePath);
-					}
-
-					collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
-					offsetOfFiles.put(filePath, fileSize);
-
-					LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
-				}
-			}
-
-			Thread.sleep(interval);
-		}
-	}
-
-	private List<String> listNewFiles() throws IOException {
-		List<String> files = new ArrayList<String>();
-
-		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
-
-		for (FileStatus status : statuses) {
-			Path filePath = status.getPath();
-			String fileName = filePath.getName();
-			long modificationTime = status.getModificationTime();
-
-			if (!isFiltered(fileName, modificationTime)) {
-				files.add(filePath.toString());
-				modificationTimes.put(fileName, modificationTime);
-			}
-		}
-		return files;
-	}
-
-	private boolean isFiltered(String fileName, long modificationTime) {
-
-		if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
-				|| fileName.startsWith(".") || fileName.contains("_COPYING_")) {
-			return true;
-		} else {
-			Long lastModification = modificationTimes.get(fileName);
-			if (lastModification == null) {
-				return false;
-			} else {
-				return lastModification >= modificationTime;
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
deleted file mode 100644
index 0882d9e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
-public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
-		FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
-		stream.seek(value.f1);
-
-		BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
-		String line;
-
-		try {
-			while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
-				out.collect(line);
-			}
-		} finally {
-			reader.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
deleted file mode 100644
index 9289355..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
-import org.apache.flink.util.Collector;
-
-public class FileSourceFunction extends RichParallelSourceFunction<String> {
-	private static final long serialVersionUID = 1L;
-
-	private InputSplitProvider provider;
-
-	private InputFormat<String, ?> inputFormat;
-
-	private TypeInformation<String> typeInfo;
-
-	private volatile boolean isRunning;
-
-	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
-		this.inputFormat = format;
-		this.typeInfo = typeInfo;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		this.provider = context.getInputSplitProvider();
-		inputFormat.configure(context.getTaskStubParameters());
-	}
-
-	@Override
-	public void run(Collector<String> collector) throws Exception {
-		isRunning = true;
-		final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
-				.getExecutionConfig());
-		final Iterator<InputSplit> splitIterator = getInputSplits();
-		@SuppressWarnings("unchecked")
-		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
-		try {
-			while (isRunning && splitIterator.hasNext()) {
-
-				final InputSplit split = splitIterator.next();
-				String record = serializer.createInstance();
-
-				format.open(split);
-				while (isRunning && !format.reachedEnd()) {
-					if ((record = format.nextRecord(record)) != null) {
-						collector.collect(record);
-					}
-				}
-
-			}
-			collector.close();
-		} finally {
-			format.close();
-		}
-		isRunning = false;
-	}
-
-	private Iterator<InputSplit> getInputSplits() {
-
-		return new Iterator<InputSplit>() {
-
-			private InputSplit nextSplit;
-
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-
-				if (nextSplit != null) {
-					return true;
-				}
-
-				InputSplit split = provider.getNextInputSplit();
-
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				} else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public InputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final InputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
deleted file mode 100644
index db452dc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class FromElementsFunction<T> implements SourceFunction<T> {
-	private static final long serialVersionUID = 1L;
-
-	Iterable<T> iterable;
-
-	private volatile boolean isRunning;
-
-	public FromElementsFunction(T... elements) {
-		this.iterable = Arrays.asList(elements);
-	}
-
-	public FromElementsFunction(Collection<T> elements) {
-		this.iterable = elements;
-	}
-
-	public FromElementsFunction(Iterable<T> elements) {
-		this.iterable = elements;
-	}
-
-	@Override
-	public void run(Collector<T> collector) throws Exception {
-		isRunning = true;
-		for (T element : iterable) {
-			if (isRunning) {
-				collector.collect(element);
-			} else {
-				break;
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
deleted file mode 100644
index df3b462..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.NumberSequenceIterator;
-
-/**
- * Source Function used to generate the number sequence
- * 
- */
-public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
-
-	private static final long serialVersionUID = 1L;
-
-	private NumberSequenceIterator fullIterator;
-	private NumberSequenceIterator splitIterator;
-
-	private volatile boolean isRunning;
-
-	public GenSequenceFunction(long from, long to) {
-		fullIterator = new NumberSequenceIterator(from, to);
-	}
-
-	@Override
-	public void run(Collector<Long> collector) throws Exception {
-		isRunning = true;
-		while (splitIterator.hasNext() && isRunning) {
-			collector.collect(splitIterator.next());
-		}
-	}
-
-	@Override
-	public void open(Configuration config) {
-		int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
-		int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
deleted file mode 100644
index 664d39a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public interface GenericSourceFunction<T> {
-
-	public TypeInformation<T> getType();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
deleted file mode 100644
index 3b96bf9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-/**
- * A stream data source that is executed in parallel. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
- * of the source.
- *
- * <p>This interface acts only as a marker to tell the system that this source may
- * be executed in parallel. When different parallel instances are required to perform
- * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
- * context, which revels information like the number of parallel tasks, and which parallel
- * task the current instance is.
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
deleted file mode 100644
index 028c06a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
-		implements ParallelSourceFunction<OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
deleted file mode 100644
index b9331cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * <p>This class is useful when implementing parallel sources where different parallel subtasks
- * need to perform different work. Typical patterns for that are:
- * <ul>
- *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
- *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
- *         to determine the current parallelism. It is strongly encouraged to use this method, rather than
- *         hard-wiring the parallelism, because the configured parallelism may change depending on
- *         program configuration. The parallelism may also change after recovering failures, when fewer than
- *         desired parallel worker as available.</li>
- *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
- *         determine which subtask the current instance of the function executes.</li>
- * </ul>
- * </p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
deleted file mode 100644
index d6a5b2b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SocketTextStreamFunction extends RichSourceFunction<String> {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
-
-	private static final long serialVersionUID = 1L;
-
-	private String hostname;
-	private int port;
-	private char delimiter;
-	private long maxRetry;
-	private boolean retryForever;
-	private Socket socket;
-	private static final int CONNECTION_TIMEOUT_TIME = 0;
-	private static final int CONNECTION_RETRY_SLEEP = 1000;
-
-	private volatile boolean isRunning = false;
-
-	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
-		this.hostname = hostname;
-		this.port = port;
-		this.delimiter = delimiter;
-		this.maxRetry = maxRetry;
-		this.retryForever = maxRetry < 0;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		socket = new Socket();
-		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
-	}
-
-	@Override
-	public void run(Collector<String> collector) throws Exception {
-		streamFromSocket(collector, socket);
-	}
-
-	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
-		isRunning = true;
-		try {
-			StringBuffer buffer = new StringBuffer();
-			BufferedReader reader = new BufferedReader(new InputStreamReader(
-					socket.getInputStream()));
-
-			while (isRunning) {
-				int data;
-				try {
-					data = reader.read();
-				} catch (SocketException e) {
-					if (!isRunning) {
-						break;
-					} else {
-						throw e;
-					}
-				}
-
-				if (data == -1) {
-					socket.close();
-					long retry = 0;
-					boolean success = false;
-					while (retry < maxRetry && !success) {
-						if (!retryForever) {
-							retry++;
-						}
-						LOG.warn("Lost connection to server socket. Retrying in "
-								+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
-						try {
-							socket = new Socket();
-							socket.connect(new InetSocketAddress(hostname, port),
-									CONNECTION_TIMEOUT_TIME);
-							success = true;
-						} catch (ConnectException ce) {
-							Thread.sleep(CONNECTION_RETRY_SLEEP);
-						}
-					}
-
-					if (success) {
-						LOG.info("Server socket is reconnected.");
-					} else {
-						LOG.error("Could not reconnect to server socket.");
-						break;
-					}
-					reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-					continue;
-				}
-
-				if (data == delimiter) {
-					collector.collect(buffer.toString());
-					buffer = new StringBuffer();
-				} else if (data != '\r') { // ignore carriage return
-					buffer.append((char) data);
-				}
-			}
-
-			if (buffer.length() > 0) {
-				collector.collect(buffer.toString());
-			}
-		} finally {
-			socket.close();
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-		if (socket != null && !socket.isClosed()) {
-			try {
-				socket.close();
-			} catch (IOException e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Could not close open socket");
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
deleted file mode 100644
index 0a423cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-/**
- * Interface for a stream data source.
- *
- * <p>Sources implementing this specific interface are executed with
- * parallelism 1. To execute your sources in parallel
- * see {@link ParallelSourceFunction}.</p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public interface SourceFunction<OUT> extends Function, Serializable {
-
-	/**
-	 * Main work method of the source. This function is invoked at the beginning of the
-	 * source's life and is expected to produce its data py "pushing" the records into
-	 * the given collector.
-	 *
-	 * @param collector The collector that forwards records to the source's consumers.
-	 *
-	 * @throws Exception Throwing any type of exception will cause the source to be considered
-	 *                   failed. When fault tolerance is enabled, recovery will be triggered,
-	 *                   which may create a new instance of this source.
-	 */
-	public void run(Collector<OUT> collector) throws Exception;
-
-	/**
-	 * This method signals the source function to cancel its operation
-	 * The method is called by the framework if the task is to be aborted prematurely.
-	 * This happens when the user cancels the job, or when the task is canceled as
-	 * part of a program failure and cleanup.
-	 */
-	public void cancel();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
new file mode 100644
index 0000000..ff045a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Abstract class for defining rich mapWindow transformation to be applied on
+ * {@link WindowedDataStream}s. The mapWindow function will be called on each
+ * {@link StreamWindow}.</p> In addition the user can access the functionality
+ * provided by the {@link RichFunction} interface.
+ */
+public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements
+		WindowMapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 9052714915997374185L;
+
+	@Override
+	public abstract void mapWindow(Iterable<IN> values, Collector<OUT> out) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
new file mode 100644
index 0000000..ececb29
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface for defining mapWindow transformation to be applied on
+ * {@link WindowedDataStream}s. The mapWindow function will be called on each
+ * {@link StreamWindow}.
+ */
+public interface WindowMapFunction<T, O> extends Function, Serializable {
+
+	void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
new file mode 100644
index 0000000..23cca90
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+
+public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
+	private static final long serialVersionUID = 1L;
+
+	public int position;
+
+	public AggregationFunction(int pos) {
+		this.position = pos;
+	}
+
+	public static enum AggregationType {
+		SUM, MIN, MAX, MINBY, MAXBY,
+	}
+
+}