You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/15 23:07:43 UTC

[1/2] incubator-flink git commit: [streaming] ReadTextFile re-implemented as RichFunction and cleanup

Repository: incubator-flink
Updated Branches:
  refs/heads/master 283c398e4 -> 1a6b40465


[streaming] ReadTextFile re-implemented as RichFunction and cleanup

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a6b4046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a6b4046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a6b4046

Branch: refs/heads/master
Commit: 1a6b404651622aad3e4595549c0c6bc538de091c
Parents: 510a811
Author: mbalassi <mb...@apache.org>
Authored: Sat Nov 29 21:30:59 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Mon Dec 15 22:13:38 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 73 +++++++++-------
 .../environment/StreamExecutionEnvironment.java | 89 +++++---------------
 .../api/function/source/FileSourceFunction.java | 45 ++++------
 .../api/function/source/FileStreamFunction.java |  9 +-
 .../function/source/FromElementsFunction.java   |  2 +-
 .../function/source/GenSequenceFunction.java    |  2 +-
 .../api/function/source/SourceFunction.java     | 13 +--
 .../api/invokable/StreamInvokable.java          |  1 -
 .../api/streamvertex/StreamVertex.java          |  6 +-
 .../streamvertex/StreamingRuntimeContext.java   | 42 ++++++---
 .../api/streamvertex/StreamVertexTest.java      |  6 --
 11 files changed, 115 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 01fd1e9..5fc4a1b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -32,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
@@ -84,7 +82,7 @@ public class JobGraphBuilder {
 	private Map<String, Integer> iterationTailCount;
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
-	private Map<String, UserCodeWrapper<? extends InputFormat<String, ?>>> sources;
+	private Map<String, InputFormat<String, ?>> inputFormatList;
 
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -117,7 +115,7 @@ public class JobGraphBuilder {
 		iterationTailCount = new HashMap<String, Integer>();
 		iterationWaitTime = new HashMap<String, Long>();
 		operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
-		sources = new HashMap<String, UserCodeWrapper<? extends InputFormat<String, ?>>>();
+		inputFormatList = new HashMap<String, InputFormat<String, ?>>();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("JobGraph created");
@@ -162,28 +160,34 @@ public class JobGraphBuilder {
 		}
 	}
 
+	/**
+	 * Adds a source vertex to the streaming JobGraph with the given parameters
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param function
+	 *            User defined function
+	 * @param inTypeInfo
+	 *            Input type for serialization
+	 * @param outTypeInfo
+	 *            Output type for serialization
+	 * @param operatorName
+	 *            Operator type
+	 * @param serializedFunction
+	 *            Serialized udf
+	 * @param parallelism
+	 *            Number of parallel instances created
+	 */
 	public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function,
-			TypeInformation<IN> inTypeInfo,	TypeInformation<OUT> outTypeInfo, String operatorName,
+			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
 			byte[] serializedFunction, int parallelism) {
 
-		StreamInvokable<OUT, OUT> invokableObject = new SourceInvokable<OUT>(function);
+		@SuppressWarnings("unchecked")
+		StreamInvokable<IN, OUT> invokableObject = (StreamInvokable<IN, OUT>) new SourceInvokable<OUT>(
+				function);
 
-		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
+		addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName,
 				serializedFunction, parallelism);
-		
-		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
-				inTypeInfo) : null;
-		StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
-				outTypeInfo) : null;
-
-		addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
-
-		sources.put(vertexName, function.getFormatWrapper());
-		System.out.println(sources);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Vertex: {}", vertexName);
-		}
 	}
 
 	/**
@@ -283,7 +287,7 @@ public class JobGraphBuilder {
 	 *            Name of the vertex
 	 * @param vertexClass
 	 *            The class of the vertex
-	 * @param invokableObject
+	 * @param invokableObjectject
 	 *            The user defined invokable object
 	 * @param operatorName
 	 *            Type of the user defined operator
@@ -372,16 +376,8 @@ public class JobGraphBuilder {
 			config.setIterationWaitTime(iterationWaitTime.get(vertexName));
 		}
 
-		if (sources.containsKey(vertexName)) {
-			TaskConfig taskConfig = new TaskConfig(vertex.getConfiguration());
-			// TypeInformation<?> OutTypeInfo =
-			// typeWrapperOut1.get(vertexName).getTypeInfo();
-			InputFormat<String, ?> format = sources.get(vertexName).getUserCodeObject();
-			vertex.setInputSplitSource(sources.get(vertexName).getUserCodeObject());
-			// taskConfig.setOutputSerializer(createSerializer(OutTypeInfo));
-			format.configure(taskConfig.getStubParameters());
-			// TaskConfig(vertex.getConfiguration());
-			// taskConfig.setStubWrapper(sources.get(vertexName));
+		if (inputFormatList.containsKey(vertexName)) {
+			vertex.setInputSplitSource(inputFormatList.get(vertexName));
 		}
 
 		streamVertices.put(vertexName, vertex);
@@ -438,6 +434,19 @@ public class JobGraphBuilder {
 		vertexParallelism.put(vertexName, parallelism);
 	}
 
+	/**
+	 * Sets the input format for the given vertex.
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param inputFormat
+	 *            input format of the file source associated with the given
+	 *            vertex
+	 */
+	public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) {
+		inputFormatList.put(vertexName, inputFormat);
+	}
+
 	public void setMutability(String vertexName, boolean isMutable) {
 		mutability.put(vertexName, isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 78d18e7..85fb90f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -131,6 +132,13 @@ public abstract class StreamExecutionEnvironment {
 		return this;
 	}
 
+	/**
+	 * Sets the maximum time frequency (milliseconds) for the flushing of the
+	 * output buffers. For clarification on the extremal values see
+	 * {@link #setBufferTimeout(long)}.
+	 * 
+	 * @return The timeout of the buffer.
+	 */
 	public long getBufferTimeout() {
 		return this.bufferTimeout;
 	}
@@ -162,36 +170,16 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the text file.
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
-		// checkIfFileExists(filePath);
 		Validate.notNull(filePath, "The file path may not be null.");
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
-		return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
-	}
-
-	/**
-	 * Creates a DataStream that represents the Strings produced by reading the
-	 * given file line wise. The file will be read with the system's default
-	 * character set.
-	 * 
-	 * @param filePath
-	 *            The path of the file, as a URI (e.g.,
-	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
-	 * @param parallelism
-	 *            degree of parallelism
-	 * @return The DataStream representing the text file.
-	 */
-	public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
-		Validate.notNull(filePath, "The file path may not be null.");
-		TextInputFormat format = new TextInputFormat(new Path(filePath));
-		return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO),
-				parallelism);
+		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 
+		return addFileSource(format, typeInfo);
 	}
 
 	/**
 	 * Creates a DataStream that represents the Strings produced by reading the
-	 * given file line wise. The file will be read with the given
-	 * character set.
+	 * given file line wise. The file will be read with the given character set.
 	 * 
 	 * @param filePath
 	 *            The path of the file, as a URI (e.g.,
@@ -201,51 +189,11 @@ public abstract class StreamExecutionEnvironment {
 	public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
 		Validate.notNull(filePath, "The file path may not be null.");
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 		format.setCharsetName(charsetName);
-		return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
-	}
 
-	// public DataStreamSource<StringValue> readTextFileWithValue(String
-	// filePath) {
-	// Validate.notNull(filePath, "The file path may not be null.");
-	// TextValueInputFormat format = new TextValueInputFormat(new
-	// Path(filePath));
-	// return addSource(new FileSourceFunction<StringValue>(format,
-	// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
-	// }
-	//
-	// public DataStreamSource<StringValue> readTextFileWithValue(String
-	// filePath, String charsetName,
-	// boolean skipInvalidLines) {
-	// Validate.notNull(filePath, "The file path may not be null.");
-	// TextValueInputFormat format = new TextValueInputFormat(new
-	// Path(filePath));
-	// format.setCharsetName(charsetName);
-	// format.setSkipInvalidLines(skipInvalidLines);
-	// return addSource(new FileSourceFunction<StringValue>(format,
-	// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
-	// }
-	//
-	// public <X> DataStreamSource<X> readFile(FileInputFormat<X> format, String
-	// filePath) {
-	// if (format == null) {
-	// throw new IllegalArgumentException("InputFormat must not be null.");
-	// }
-	// if (filePath == null) {
-	// throw new IllegalArgumentException("The file path must not be null.");
-	// }
-	//
-	// format.setFilePath(new Path(filePath));
-	// try {
-	// return addSource(
-	// new FileSourceFunction<X>(format,
-	// TypeExtractor.getInputFormatTypes(format)), 1);
-	// } catch (Exception e) {
-	// throw new InvalidProgramException(
-	// "The type returned by the input format could not be automatically determined. "
-	// + "Please specify the TypeInformation of the produced type explicitly.");
-	// }
-	// }
+		return addFileSource(format, typeInfo);
+	}
 
 	/**
 	 * Creates a DataStream that represents the Strings produced by reading the
@@ -424,6 +372,15 @@ public abstract class StreamExecutionEnvironment {
 		return returnStream;
 	}
 
+	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
+			TypeInformation<String> typeInfo) {
+		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
+		DataStreamSource<String> returnStream = addSource(function);
+		jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
+
+		return returnStream;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Instantiation of Execution Contexts
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 879eb3f..5dfe4b2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -21,47 +21,31 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.util.Collector;
 
-public class FileSourceFunction extends SourceFunction<String> {
+public class FileSourceFunction extends RichSourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private InputSplitProvider provider;
 
-	private InputFormat<String, ?> format;
+	private InputFormat<String, ?> inputFormat;
 
 	private TypeSerializerFactory<String> serializerFactory;
 
-	private UserCodeWrapper<? extends InputFormat<String, ?>> formatWrapper;
-
-	// cancel flag
-	private volatile boolean taskCanceled = false;
-
 	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
-		this.format = format;
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		GenericDataSourceBase<String, ?> source = new GenericDataSourceBase(format,
-				new OperatorInformation<String>(typeInfo), format.toString());
-		formatWrapper = source.getUserCodeWrapper();
+		this.inputFormat = format;
 		this.serializerFactory = createSerializer(typeInfo);
 	}
 
-	@Override
-	public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
-		return this.formatWrapper;
-	}
-
 	private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
 		TypeSerializer<String> serializer = typeInfo.createSerializer();
 
@@ -74,20 +58,27 @@ public class FileSourceFunction extends SourceFunction<String> {
 	}
 
 	@Override
+	public void open(Configuration parameters) throws Exception {
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		this.provider = context.getInputSplitProvider();
+		inputFormat.configure(context.getTaskStubParameters());
+	}
+
+	@Override
 	public void invoke(Collector<String> collector) throws Exception {
 		final TypeSerializer<String> serializer = serializerFactory.getSerializer();
 		final Iterator<InputSplit> splitIterator = getInputSplits();
 		@SuppressWarnings("unchecked")
-		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.format;
+		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
 		try {
-			while (!this.taskCanceled && splitIterator.hasNext()) {
+			while (splitIterator.hasNext()) {
 
 				final InputSplit split = splitIterator.next();
 				String record = serializer.createInstance();
 
 				format.open(split);
 				try {
-					while (!this.taskCanceled && !format.reachedEnd()) {
+					while (!format.reachedEnd()) {
 						if ((record = format.nextRecord(record)) != null) {
 							collector.collect(record);
 						}
@@ -148,10 +139,4 @@ public class FileSourceFunction extends SourceFunction<String> {
 			}
 		};
 	}
-
-	@Override
-	public final void initialize(Environment env) {
-		this.provider = env.getInputSplitProvider();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index 18144b5..7371ac9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -21,11 +21,9 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.util.Collector;
 
-public class FileStreamFunction extends SourceFunction<String> {
+public class FileStreamFunction implements SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private final String path;
@@ -48,9 +46,4 @@ public class FileStreamFunction extends SourceFunction<String> {
 			br.close();
 		}
 	}
-
-	@Override
-	public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
-		return null;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 89e0823..cb960dd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 
 import org.apache.flink.util.Collector;
 
-public class FromElementsFunction<T> extends SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
 	Iterable<T> iterable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index ece68b2..69601ff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
  * Source Function used to generate the number sequence
  * 
  */
-public class GenSequenceFunction extends SourceFunction<Long> {
+public class GenSequenceFunction implements SourceFunction<Long> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index a3949e5..917562a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -20,17 +20,10 @@ package org.apache.flink.streaming.api.function.source;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.util.Collector;
 
-public abstract class SourceFunction<OUT> implements Function, Serializable {
-	private static final long serialVersionUID = 1L;
+public interface SourceFunction<OUT> extends Function, Serializable {
 
-	public abstract void invoke(Collector<OUT> collector) throws Exception;
-	
-	public void initialize(Environment env){}
-	
-	public abstract UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper();
+	public void invoke(Collector<OUT> collector) throws Exception;
+		
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index ea518a0..e587b93 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 1dd78b5..13e6c9f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -76,9 +76,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	}
 
 	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
-		if (userInvokable.getSourceFunction() != null) {
-			userInvokable.getSourceFunction().initialize(getEnvironment());
-		}
 		userInvokable.setRuntimeContext(context);
 		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();
@@ -107,8 +104,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	public StreamingRuntimeContext createRuntimeContext(String taskName,
 			Map<String, OperatorState<?>> states) {
 		Environment env = getEnvironment();
-		return new StreamingRuntimeContext(taskName, env.getCurrentNumberOfSubtasks(),
-				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), states, env.getCopyTask());
+		return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index a56ecff..49cf15f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -19,11 +19,13 @@
 package org.apache.flink.streaming.api.streamvertex;
 
 import java.util.Map;
-import java.util.concurrent.FutureTask;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.state.OperatorState;
 
 /**
@@ -32,25 +34,20 @@ import org.apache.flink.streaming.state.OperatorState;
  */
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
+	private Environment env;
 	private final Map<String, OperatorState<?>> operatorStates;
 
-	public StreamingRuntimeContext(String name, int numParallelSubtasks, int subtaskIndex,
-			ClassLoader userCodeClassLoader, Map<String, OperatorState<?>> operatorStates) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
-		this.operatorStates = operatorStates;
-	}
-
-	public StreamingRuntimeContext(String name, int numParallelSubtasks, int subtaskIndex,
-			ClassLoader userCodeClassLoader, Map<String, OperatorState<?>> operatorStates,
-			Map<String, FutureTask<Path>> cpTasks) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, cpTasks);
+	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
+			Map<String, OperatorState<?>> operatorStates) {
+		super(name, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
+				userCodeClassLoader, env.getCopyTask());
+		this.env = env;
 		this.operatorStates = operatorStates;
 	}
 
 	/**
 	 * Returns the operator state registered by the given name for the operator.
 	 * 
-	 * 
 	 * @param name
 	 *            Name of the operator state to be returned.
 	 * @return The operator state.
@@ -69,4 +66,23 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	}
 
+	/**
+	 * Returns the input split provider associated with the operator.
+	 * 
+	 * @return The input split provider.
+	 */
+	public InputSplitProvider getInputSplitProvider() {
+		return env.getInputSplitProvider();
+	}
+
+	/**
+	 * Returns the stub parameters associated with the {@link TaskConfig} of the
+	 * operator.
+	 * 
+	 * @return The stub parameters.
+	 */
+	public Configuration getTaskStubParameters() {
+		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 0b73c3a..9edf44e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -116,12 +116,6 @@ public class StreamVertexTest {
 			fail();
 		} catch (RuntimeException e) {
 		}
-
-		try {
-			env.readTextFile("random/path/that/is/not/valid");
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
 	}
 
 	private static class CoMap implements CoMapFunction<String, Long, String> {


[2/2] incubator-flink git commit: [streaming] Basic support reading from local and distributed file systems in readTextFile methods

Posted by mb...@apache.org.
[streaming] Basic support reading from local and distributed file systems in readTextFile methods

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/510a8113
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/510a8113
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/510a8113

Branch: refs/heads/master
Commit: 510a81130a8fbfee4a14abc262299a1611a5eda3
Parents: 283c398
Author: szape <ne...@gmail.com>
Authored: Thu Nov 20 11:49:08 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Mon Dec 15 22:13:38 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  43 ++++++
 .../environment/StreamExecutionEnvironment.java |  94 ++++++++++++-
 .../api/function/source/FileSourceFunction.java | 139 +++++++++++++++++--
 .../api/function/source/FileStreamFunction.java |   9 +-
 .../function/source/FromElementsFunction.java   |   2 +-
 .../function/source/GenSequenceFunction.java    |   2 +-
 .../api/function/source/SourceFunction.java     |  16 ++-
 .../api/invokable/SourceInvokable.java          |   5 +
 .../api/invokable/StreamInvokable.java          |   6 +
 .../api/streamvertex/StreamVertex.java          |   3 +
 10 files changed, 292 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index e80d86d..01fd1e9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -30,7 +32,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -79,6 +84,7 @@ public class JobGraphBuilder {
 	private Map<String, Integer> iterationTailCount;
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
+	private Map<String, UserCodeWrapper<? extends InputFormat<String, ?>>> sources;
 
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -111,6 +117,7 @@ public class JobGraphBuilder {
 		iterationTailCount = new HashMap<String, Integer>();
 		iterationWaitTime = new HashMap<String, Long>();
 		operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
+		sources = new HashMap<String, UserCodeWrapper<? extends InputFormat<String, ?>>>();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("JobGraph created");
@@ -155,6 +162,30 @@ public class JobGraphBuilder {
 		}
 	}
 
+	public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function,
+			TypeInformation<IN> inTypeInfo,	TypeInformation<OUT> outTypeInfo, String operatorName,
+			byte[] serializedFunction, int parallelism) {
+
+		StreamInvokable<OUT, OUT> invokableObject = new SourceInvokable<OUT>(function);
+
+		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
+				serializedFunction, parallelism);
+		
+		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+				inTypeInfo) : null;
+		StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+				outTypeInfo) : null;
+
+		addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
+
+		sources.put(vertexName, function.getFormatWrapper());
+		System.out.println(sources);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Vertex: {}", vertexName);
+		}
+	}
+
 	/**
 	 * Adds a vertex for the iteration head to the {@link JobGraph}. The
 	 * iterated values will be fed from this vertex back to the graph.
@@ -341,6 +372,18 @@ public class JobGraphBuilder {
 			config.setIterationWaitTime(iterationWaitTime.get(vertexName));
 		}
 
+		if (sources.containsKey(vertexName)) {
+			TaskConfig taskConfig = new TaskConfig(vertex.getConfiguration());
+			// TypeInformation<?> OutTypeInfo =
+			// typeWrapperOut1.get(vertexName).getTypeInfo();
+			InputFormat<String, ?> format = sources.get(vertexName).getUserCodeObject();
+			vertex.setInputSplitSource(sources.get(vertexName).getUserCodeObject());
+			// taskConfig.setOutputSerializer(createSerializer(OutTypeInfo));
+			format.configure(taskConfig.getStubParameters());
+			// TaskConfig(vertex.getConfiguration());
+			// taskConfig.setStubWrapper(sources.get(vertexName));
+		}
+
 		streamVertices.put(vertexName, vertex);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5c47592..78d18e7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -24,11 +24,15 @@ import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -158,12 +162,93 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The DataStream representing the text file.
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
-		checkIfFileExists(filePath);
-		return addSource(new FileSourceFunction(filePath));
+		// checkIfFileExists(filePath);
+		Validate.notNull(filePath, "The file path may not be null.");
+		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
 	}
 
 	/**
 	 * Creates a DataStream that represents the Strings produced by reading the
+	 * given file line wise. The file will be read with the system's default
+	 * character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @param parallelism
+	 *            degree of parallelism
+	 * @return The DataStream representing the text file.
+	 */
+	public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
+		Validate.notNull(filePath, "The file path may not be null.");
+		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO),
+				parallelism);
+
+	}
+
+	/**
+	 * Creates a DataStream that represents the Strings produced by reading the
+	 * given file line wise. The file will be read with the given
+	 * character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @return The DataStream representing the text file.
+	 */
+	public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
+		Validate.notNull(filePath, "The file path may not be null.");
+		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		format.setCharsetName(charsetName);
+		return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
+	}
+
+	// public DataStreamSource<StringValue> readTextFileWithValue(String
+	// filePath) {
+	// Validate.notNull(filePath, "The file path may not be null.");
+	// TextValueInputFormat format = new TextValueInputFormat(new
+	// Path(filePath));
+	// return addSource(new FileSourceFunction<StringValue>(format,
+	// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
+	// }
+	//
+	// public DataStreamSource<StringValue> readTextFileWithValue(String
+	// filePath, String charsetName,
+	// boolean skipInvalidLines) {
+	// Validate.notNull(filePath, "The file path may not be null.");
+	// TextValueInputFormat format = new TextValueInputFormat(new
+	// Path(filePath));
+	// format.setCharsetName(charsetName);
+	// format.setSkipInvalidLines(skipInvalidLines);
+	// return addSource(new FileSourceFunction<StringValue>(format,
+	// new ValueTypeInfo<StringValue>(StringValue.class)), 1);
+	// }
+	//
+	// public <X> DataStreamSource<X> readFile(FileInputFormat<X> format, String
+	// filePath) {
+	// if (format == null) {
+	// throw new IllegalArgumentException("InputFormat must not be null.");
+	// }
+	// if (filePath == null) {
+	// throw new IllegalArgumentException("The file path must not be null.");
+	// }
+	//
+	// format.setFilePath(new Path(filePath));
+	// try {
+	// return addSource(
+	// new FileSourceFunction<X>(format,
+	// TypeExtractor.getInputFormatTypes(format)), 1);
+	// } catch (Exception e) {
+	// throw new InvalidProgramException(
+	// "The type returned by the input format could not be automatically determined. "
+	// + "Please specify the TypeInformation of the produced type explicitly.");
+	// }
+	// }
+
+	/**
+	 * Creates a DataStream that represents the Strings produced by reading the
 	 * given file line wise multiple times(infinite). The file will be read with
 	 * the system's default character set.
 	 * 
@@ -330,9 +415,8 @@ public abstract class StreamExecutionEnvironment {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
 
 		try {
-			jobGraphBuilder.addStreamVertex(returnStream.getId(),
-					new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
-					SerializationUtils.serialize(function), 1);
+			jobGraphBuilder.addSourceVertex(returnStream.getId(), function, null, outTypeInfo,
+					"source", SerializationUtils.serialize(function), getDegreeOfParallelism());
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize SourceFunction");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 0fe7149..879eb3f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -17,32 +17,141 @@
 
 package org.apache.flink.streaming.api.function.source;
 
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.OperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.util.Collector;
 
-public class FileSourceFunction implements SourceFunction<String> {
+public class FileSourceFunction extends SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
-	private final String path;
+	private InputSplitProvider provider;
 
-	public FileSourceFunction(String path) {
-		this.path = path;
+	private InputFormat<String, ?> format;
+
+	private TypeSerializerFactory<String> serializerFactory;
+
+	private UserCodeWrapper<? extends InputFormat<String, ?>> formatWrapper;
+
+	// cancel flag
+	private volatile boolean taskCanceled = false;
+
+	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
+		this.format = format;
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		GenericDataSourceBase<String, ?> source = new GenericDataSourceBase(format,
+				new OperatorInformation<String>(typeInfo), format.toString());
+		formatWrapper = source.getUserCodeWrapper();
+		this.serializerFactory = createSerializer(typeInfo);
+	}
+
+	@Override
+	public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
+		return this.formatWrapper;
+	}
+
+	private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
+		TypeSerializer<String> serializer = typeInfo.createSerializer();
+
+		if (serializer.isStateful()) {
+			return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
+		} else {
+			return new RuntimeStatelessSerializerFactory<String>(serializer,
+					typeInfo.getTypeClass());
+		}
 	}
 
 	@Override
-	public void invoke(Collector<String> collector) throws IOException {
-		BufferedReader br = new BufferedReader(new FileReader(path));
-		String line = br.readLine();
-		while (line != null) {
-			if (!line.equals("")) {
-				collector.collect(line);
+	public void invoke(Collector<String> collector) throws Exception {
+		final TypeSerializer<String> serializer = serializerFactory.getSerializer();
+		final Iterator<InputSplit> splitIterator = getInputSplits();
+		@SuppressWarnings("unchecked")
+		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.format;
+		try {
+			while (!this.taskCanceled && splitIterator.hasNext()) {
+
+				final InputSplit split = splitIterator.next();
+				String record = serializer.createInstance();
+
+				format.open(split);
+				try {
+					while (!this.taskCanceled && !format.reachedEnd()) {
+						if ((record = format.nextRecord(record)) != null) {
+							collector.collect(record);
+						}
+					}
+				} finally {
+					format.close();
+				}
 			}
-			line = br.readLine();
+			collector.close();
+		} catch (Exception ex) {
+			ex.printStackTrace();
 		}
-		br.close();
+	}
+
+	private Iterator<InputSplit> getInputSplits() {
+
+		return new Iterator<InputSplit>() {
+
+			private InputSplit nextSplit;
+
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+
+				if (nextSplit != null) {
+					return true;
+				}
+
+				InputSplit split = provider.getNextInputSplit();
+
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				} else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public InputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final InputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+
+	@Override
+	public final void initialize(Environment env) {
+		this.provider = env.getInputSplitProvider();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index 7371ac9..18144b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -21,9 +21,11 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.util.Collector;
 
-public class FileStreamFunction implements SourceFunction<String> {
+public class FileStreamFunction extends SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private final String path;
@@ -46,4 +48,9 @@ public class FileStreamFunction implements SourceFunction<String> {
 			br.close();
 		}
 	}
+
+	@Override
+	public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
+		return null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index cb960dd..89e0823 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -22,7 +22,7 @@ import java.util.Collection;
 
 import org.apache.flink.util.Collector;
 
-public class FromElementsFunction<T> implements SourceFunction<T> {
+public class FromElementsFunction<T> extends SourceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
 	Iterable<T> iterable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 69601ff..ece68b2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
  * Source Function used to generate the number sequence
  * 
  */
-public class GenSequenceFunction implements SourceFunction<Long> {
+public class GenSequenceFunction extends SourceFunction<Long> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index d30bbba..a3949e5 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -18,11 +18,19 @@
 package org.apache.flink.streaming.api.function.source;
 
 import java.io.Serializable;
-
+
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.util.Collector;
 
-public interface SourceFunction<OUT> extends Function, Serializable {
-
-	public void invoke(Collector<OUT> collector) throws Exception;
+public abstract class SourceFunction<OUT> implements Function, Serializable {
+	private static final long serialVersionUID = 1L;
+
+	public abstract void invoke(Collector<OUT> collector) throws Exception;
+	
+	public void initialize(Environment env){}
+	
+	public abstract UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 8c9df46..0cfe028 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -49,5 +49,10 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Se
 	@Override
 	protected void callUserFunction() throws Exception {
 	}
+	
+	@Override
+	public SourceFunction<OUT> getSourceFunction(){
+		return sourceFunction;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 71739c1..ea518a0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
@@ -161,4 +163,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	public void setRuntimeContext(RuntimeContext t) {
 		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
 	}
+
+	public SourceFunction<OUT> getSourceFunction() {
+		return null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 2db0d8b..1dd78b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -76,6 +76,9 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	}
 
 	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
+		if (userInvokable.getSourceFunction() != null) {
+			userInvokable.getSourceFunction().initialize(getEnvironment());
+		}
 		userInvokable.setRuntimeContext(context);
 		userInvokable.open(getTaskConfiguration());
 		userInvokable.invoke();