You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/15 23:07:43 UTC
[1/2] incubator-flink git commit: [streaming] ReadTextFile
re-implemented as RichFunction and cleanup
Repository: incubator-flink
Updated Branches:
refs/heads/master 283c398e4 -> 1a6b40465
[streaming] ReadTextFile re-implemented as RichFunction and cleanup
Conflicts:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a6b4046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a6b4046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a6b4046
Branch: refs/heads/master
Commit: 1a6b404651622aad3e4595549c0c6bc538de091c
Parents: 510a811
Author: mbalassi <mb...@apache.org>
Authored: Sat Nov 29 21:30:59 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Mon Dec 15 22:13:38 2014 +0100
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 73 +++++++++-------
.../environment/StreamExecutionEnvironment.java | 89 +++++---------------
.../api/function/source/FileSourceFunction.java | 45 ++++------
.../api/function/source/FileStreamFunction.java | 9 +-
.../function/source/FromElementsFunction.java | 2 +-
.../function/source/GenSequenceFunction.java | 2 +-
.../api/function/source/SourceFunction.java | 13 +--
.../api/invokable/StreamInvokable.java | 1 -
.../api/streamvertex/StreamVertex.java | 6 +-
.../streamvertex/StreamingRuntimeContext.java | 42 ++++++---
.../api/streamvertex/StreamVertexTest.java | 6 --
11 files changed, 115 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 01fd1e9..5fc4a1b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -32,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
@@ -84,7 +82,7 @@ public class JobGraphBuilder {
private Map<String, Integer> iterationTailCount;
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
- private Map<String, UserCodeWrapper<? extends InputFormat<String, ?>>> sources;
+ private Map<String, InputFormat<String, ?>> inputFormatList;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -117,7 +115,7 @@ public class JobGraphBuilder {
iterationTailCount = new HashMap<String, Integer>();
iterationWaitTime = new HashMap<String, Long>();
operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
- sources = new HashMap<String, UserCodeWrapper<? extends InputFormat<String, ?>>>();
+ inputFormatList = new HashMap<String, InputFormat<String, ?>>();
if (LOG.isDebugEnabled()) {
LOG.debug("JobGraph created");
@@ -162,28 +160,34 @@ public class JobGraphBuilder {
}
}
+ /**
+ * Adds a source vertex to the streaming JobGraph with the given parameters
+ *
+ * @param vertexName
+ * Name of the vertex
+ * @param function
+ * User defined function
+ * @param inTypeInfo
+ * Input type for serialization
+ * @param outTypeInfo
+ * Output type for serialization
+ * @param operatorName
+ * Operator type
+ * @param serializedFunction
+ * Serialized udf
+ * @param parallelism
+ * Number of parallel instances created
+ */
public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function,
- TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
+ TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
byte[] serializedFunction, int parallelism) {
- StreamInvokable<OUT, OUT> invokableObject = new SourceInvokable<OUT>(function);
+ @SuppressWarnings("unchecked")
+ StreamInvokable<IN, OUT> invokableObject = (StreamInvokable<IN, OUT>) new SourceInvokable<OUT>(
+ function);
- addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
+ addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName,
serializedFunction, parallelism);
-
- StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
- inTypeInfo) : null;
- StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
- outTypeInfo) : null;
-
- addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
-
- sources.put(vertexName, function.getFormatWrapper());
- System.out.println(sources);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex: {}", vertexName);
- }
}
/**
@@ -283,7 +287,7 @@ public class JobGraphBuilder {
* Name of the vertex
* @param vertexClass
* The class of the vertex
- * @param invokableObject
+ * @param invokableObjectject
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
@@ -372,16 +376,8 @@ public class JobGraphBuilder {
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
}
- if (sources.containsKey(vertexName)) {
- TaskConfig taskConfig = new TaskConfig(vertex.getConfiguration());
- // TypeInformation<?> OutTypeInfo =
- // typeWrapperOut1.get(vertexName).getTypeInfo();
- InputFormat<String, ?> format = sources.get(vertexName).getUserCodeObject();
- vertex.setInputSplitSource(sources.get(vertexName).getUserCodeObject());
- // taskConfig.setOutputSerializer(createSerializer(OutTypeInfo));
- format.configure(taskConfig.getStubParameters());
- // TaskConfig(vertex.getConfiguration());
- // taskConfig.setStubWrapper(sources.get(vertexName));
+ if (inputFormatList.containsKey(vertexName)) {
+ vertex.setInputSplitSource(inputFormatList.get(vertexName));
}
streamVertices.put(vertexName, vertex);
@@ -438,6 +434,19 @@ public class JobGraphBuilder {
vertexParallelism.put(vertexName, parallelism);
}
+ /**
+ * Sets the input format for the given vertex.
+ *
+ * @param vertexName
+ * Name of the vertex
+ * @param inputFormat
+ * input format of the file source associated with the given
+ * vertex
+ */
+ public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) {
+ inputFormatList.put(vertexName, inputFormat);
+ }
+
public void setMutability(String vertexName, boolean isMutable) {
mutability.put(vertexName, isMutable);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 78d18e7..85fb90f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -131,6 +132,13 @@ public abstract class StreamExecutionEnvironment {
return this;
}
+ /**
+ * Sets the maximum time frequency (milliseconds) for the flushing of the
+ * output buffers. For clarification on the extremal values see
+ * {@link #setBufferTimeout(long)}.
+ *
+ * @return The timeout of the buffer.
+ */
public long getBufferTimeout() {
return this.bufferTimeout;
}
@@ -162,36 +170,16 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath) {
- // checkIfFileExists(filePath);
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
- return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
- }
-
- /**
- * Creates a DataStream that represents the Strings produced by reading the
- * given file line wise. The file will be read with the system's default
- * character set.
- *
- * @param filePath
- * The path of the file, as a URI (e.g.,
- * "file:///some/local/file" or "hdfs://host:port/file/path").
- * @param parallelism
- * degree of parallelism
- * @return The DataStream representing the text file.
- */
- public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
- Validate.notNull(filePath, "The file path may not be null.");
- TextInputFormat format = new TextInputFormat(new Path(filePath));
- return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO),
- parallelism);
+ TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+ return addFileSource(format, typeInfo);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
- * given file line wise. The file will be read with the given
- * character set.
+ * given file line wise. The file will be read with the given character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
@@ -201,51 +189,11 @@ public abstract class StreamExecutionEnvironment {
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Validate.notNull(filePath, "The file path may not be null.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
+ TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
- return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
- }
- // public DataStreamSource<StringValue> readTextFileWithValue(String
- // filePath) {
- // Validate.notNull(filePath, "The file path may not be null.");
- // TextValueInputFormat format = new TextValueInputFormat(new
- // Path(filePath));
- // return addSource(new FileSourceFunction<StringValue>(format,
- // new ValueTypeInfo<StringValue>(StringValue.class)), 1);
- // }
- //
- // public DataStreamSource<StringValue> readTextFileWithValue(String
- // filePath, String charsetName,
- // boolean skipInvalidLines) {
- // Validate.notNull(filePath, "The file path may not be null.");
- // TextValueInputFormat format = new TextValueInputFormat(new
- // Path(filePath));
- // format.setCharsetName(charsetName);
- // format.setSkipInvalidLines(skipInvalidLines);
- // return addSource(new FileSourceFunction<StringValue>(format,
- // new ValueTypeInfo<StringValue>(StringValue.class)), 1);
- // }
- //
- // public <X> DataStreamSource<X> readFile(FileInputFormat<X> format, String
- // filePath) {
- // if (format == null) {
- // throw new IllegalArgumentException("InputFormat must not be null.");
- // }
- // if (filePath == null) {
- // throw new IllegalArgumentException("The file path must not be null.");
- // }
- //
- // format.setFilePath(new Path(filePath));
- // try {
- // return addSource(
- // new FileSourceFunction<X>(format,
- // TypeExtractor.getInputFormatTypes(format)), 1);
- // } catch (Exception e) {
- // throw new InvalidProgramException(
- // "The type returned by the input format could not be automatically determined. "
- // + "Please specify the TypeInformation of the produced type explicitly.");
- // }
- // }
+ return addFileSource(format, typeInfo);
+ }
/**
* Creates a DataStream that represents the Strings produced by reading the
@@ -424,6 +372,15 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
+ private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
+ TypeInformation<String> typeInfo) {
+ FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
+ DataStreamSource<String> returnStream = addSource(function);
+ jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
+
+ return returnStream;
+ }
+
// --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 879eb3f..5dfe4b2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -21,47 +21,31 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.OperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
-public class FileSourceFunction extends SourceFunction<String> {
+public class FileSourceFunction extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
private InputSplitProvider provider;
- private InputFormat<String, ?> format;
+ private InputFormat<String, ?> inputFormat;
private TypeSerializerFactory<String> serializerFactory;
- private UserCodeWrapper<? extends InputFormat<String, ?>> formatWrapper;
-
- // cancel flag
- private volatile boolean taskCanceled = false;
-
public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
- this.format = format;
- @SuppressWarnings({ "unchecked", "rawtypes" })
- GenericDataSourceBase<String, ?> source = new GenericDataSourceBase(format,
- new OperatorInformation<String>(typeInfo), format.toString());
- formatWrapper = source.getUserCodeWrapper();
+ this.inputFormat = format;
this.serializerFactory = createSerializer(typeInfo);
}
- @Override
- public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
- return this.formatWrapper;
- }
-
private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
TypeSerializer<String> serializer = typeInfo.createSerializer();
@@ -74,20 +58,27 @@ public class FileSourceFunction extends SourceFunction<String> {
}
@Override
+ public void open(Configuration parameters) throws Exception {
+ StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+ this.provider = context.getInputSplitProvider();
+ inputFormat.configure(context.getTaskStubParameters());
+ }
+
+ @Override
public void invoke(Collector<String> collector) throws Exception {
final TypeSerializer<String> serializer = serializerFactory.getSerializer();
final Iterator<InputSplit> splitIterator = getInputSplits();
@SuppressWarnings("unchecked")
- final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.format;
+ final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
try {
- while (!this.taskCanceled && splitIterator.hasNext()) {
+ while (splitIterator.hasNext()) {
final InputSplit split = splitIterator.next();
String record = serializer.createInstance();
format.open(split);
try {
- while (!this.taskCanceled && !format.reachedEnd()) {
+ while (!format.reachedEnd()) {
if ((record = format.nextRecord(record)) != null) {
collector.collect(record);
}
@@ -148,10 +139,4 @@ public class FileSourceFunction extends SourceFunction<String> {
}
};
}
-
- @Override
- public final void initialize(Environment env) {
- this.provider = env.getInputSplitProvider();
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index 18144b5..7371ac9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -21,11 +21,9 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.util.Collector;
-public class FileStreamFunction extends SourceFunction<String> {
+public class FileStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
@@ -48,9 +46,4 @@ public class FileStreamFunction extends SourceFunction<String> {
br.close();
}
}
-
- @Override
- public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 89e0823..cb960dd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -22,7 +22,7 @@ import java.util.Collection;
import org.apache.flink.util.Collector;
-public class FromElementsFunction<T> extends SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index ece68b2..69601ff 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
* Source Function used to generate the number sequence
*
*/
-public class GenSequenceFunction extends SourceFunction<Long> {
+public class GenSequenceFunction implements SourceFunction<Long> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index a3949e5..917562a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -20,17 +20,10 @@ package org.apache.flink.streaming.api.function.source;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.util.Collector;
-public abstract class SourceFunction<OUT> implements Function, Serializable {
- private static final long serialVersionUID = 1L;
+public interface SourceFunction<OUT> extends Function, Serializable {
- public abstract void invoke(Collector<OUT> collector) throws Exception;
-
- public void initialize(Environment env){}
-
- public abstract UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper();
+ public void invoke(Collector<OUT> collector) throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index ea518a0..e587b93 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 1dd78b5..13e6c9f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -76,9 +76,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
- if (userInvokable.getSourceFunction() != null) {
- userInvokable.getSourceFunction().initialize(getEnvironment());
- }
userInvokable.setRuntimeContext(context);
userInvokable.open(getTaskConfiguration());
userInvokable.invoke();
@@ -107,8 +104,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
public StreamingRuntimeContext createRuntimeContext(String taskName,
Map<String, OperatorState<?>> states) {
Environment env = getEnvironment();
- return new StreamingRuntimeContext(taskName, env.getCurrentNumberOfSubtasks(),
- env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), states, env.getCopyTask());
+ return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index a56ecff..49cf15f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -19,11 +19,13 @@
package org.apache.flink.streaming.api.streamvertex;
import java.util.Map;
-import java.util.concurrent.FutureTask;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.state.OperatorState;
/**
@@ -32,25 +34,20 @@ import org.apache.flink.streaming.state.OperatorState;
*/
public class StreamingRuntimeContext extends RuntimeUDFContext {
+ private Environment env;
private final Map<String, OperatorState<?>> operatorStates;
- public StreamingRuntimeContext(String name, int numParallelSubtasks, int subtaskIndex,
- ClassLoader userCodeClassLoader, Map<String, OperatorState<?>> operatorStates) {
- super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
- this.operatorStates = operatorStates;
- }
-
- public StreamingRuntimeContext(String name, int numParallelSubtasks, int subtaskIndex,
- ClassLoader userCodeClassLoader, Map<String, OperatorState<?>> operatorStates,
- Map<String, FutureTask<Path>> cpTasks) {
- super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, cpTasks);
+ public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
+ Map<String, OperatorState<?>> operatorStates) {
+ super(name, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
+ userCodeClassLoader, env.getCopyTask());
+ this.env = env;
this.operatorStates = operatorStates;
}
/**
* Returns the operator state registered by the given name for the operator.
*
- *
* @param name
* Name of the operator state to be returned.
* @return The operator state.
@@ -69,4 +66,23 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
}
+ /**
+ * Returns the input split provider associated with the operator.
+ *
+ * @return The input split provider.
+ */
+ public InputSplitProvider getInputSplitProvider() {
+ return env.getInputSplitProvider();
+ }
+
+ /**
+ * Returns the stub parameters associated with the {@link TaskConfig} of the
+ * operator.
+ *
+ * @return The stub parameters.
+ */
+ public Configuration getTaskStubParameters() {
+ return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a6b4046/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 0b73c3a..9edf44e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -116,12 +116,6 @@ public class StreamVertexTest {
fail();
} catch (RuntimeException e) {
}
-
- try {
- env.readTextFile("random/path/that/is/not/valid");
- fail();
- } catch (IllegalArgumentException e) {
- }
}
private static class CoMap implements CoMapFunction<String, Long, String> {
[2/2] incubator-flink git commit: [streaming] Basic support reading
from local and distributed file systems in readTextFile methods
Posted by mb...@apache.org.
[streaming] Basic support reading from local and distributed file systems in readTextFile methods
Conflicts:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/510a8113
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/510a8113
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/510a8113
Branch: refs/heads/master
Commit: 510a81130a8fbfee4a14abc262299a1611a5eda3
Parents: 283c398
Author: szape <ne...@gmail.com>
Authored: Thu Nov 20 11:49:08 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Mon Dec 15 22:13:38 2014 +0100
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 43 ++++++
.../environment/StreamExecutionEnvironment.java | 94 ++++++++++++-
.../api/function/source/FileSourceFunction.java | 139 +++++++++++++++++--
.../api/function/source/FileStreamFunction.java | 9 +-
.../function/source/FromElementsFunction.java | 2 +-
.../function/source/GenSequenceFunction.java | 2 +-
.../api/function/source/SourceFunction.java | 16 ++-
.../api/invokable/SourceInvokable.java | 5 +
.../api/invokable/StreamInvokable.java | 6 +
.../api/streamvertex/StreamVertex.java | 3 +
10 files changed, 292 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index e80d86d..01fd1e9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -30,7 +32,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -79,6 +84,7 @@ public class JobGraphBuilder {
private Map<String, Integer> iterationTailCount;
private Map<String, Long> iterationWaitTime;
private Map<String, Map<String, OperatorState<?>>> operatorStates;
+ private Map<String, UserCodeWrapper<? extends InputFormat<String, ?>>> sources;
/**
* Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -111,6 +117,7 @@ public class JobGraphBuilder {
iterationTailCount = new HashMap<String, Integer>();
iterationWaitTime = new HashMap<String, Long>();
operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
+ sources = new HashMap<String, UserCodeWrapper<? extends InputFormat<String, ?>>>();
if (LOG.isDebugEnabled()) {
LOG.debug("JobGraph created");
@@ -155,6 +162,30 @@ public class JobGraphBuilder {
}
}
+ public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function,
+ TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName,
+ byte[] serializedFunction, int parallelism) {
+
+ StreamInvokable<OUT, OUT> invokableObject = new SourceInvokable<OUT>(function);
+
+ addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
+ serializedFunction, parallelism);
+
+ StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+ inTypeInfo) : null;
+ StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+ outTypeInfo) : null;
+
+ addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
+
+ sources.put(vertexName, function.getFormatWrapper());
+ System.out.println(sources);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex: {}", vertexName);
+ }
+ }
+
/**
* Adds a vertex for the iteration head to the {@link JobGraph}. The
* iterated values will be fed from this vertex back to the graph.
@@ -341,6 +372,18 @@ public class JobGraphBuilder {
config.setIterationWaitTime(iterationWaitTime.get(vertexName));
}
+ if (sources.containsKey(vertexName)) {
+ TaskConfig taskConfig = new TaskConfig(vertex.getConfiguration());
+ // TypeInformation<?> OutTypeInfo =
+ // typeWrapperOut1.get(vertexName).getTypeInfo();
+ InputFormat<String, ?> format = sources.get(vertexName).getUserCodeObject();
+ vertex.setInputSplitSource(sources.get(vertexName).getUserCodeObject());
+ // taskConfig.setOutputSerializer(createSerializer(OutTypeInfo));
+ format.configure(taskConfig.getStubParameters());
+ // TaskConfig(vertex.getConfiguration());
+ // taskConfig.setStubWrapper(sources.get(vertexName));
+ }
+
streamVertices.put(vertexName, vertex);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5c47592..78d18e7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -24,11 +24,15 @@ import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -158,12 +162,93 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath) {
- checkIfFileExists(filePath);
- return addSource(new FileSourceFunction(filePath));
+ // checkIfFileExists(filePath);
+ Validate.notNull(filePath, "The file path may not be null.");
+ TextInputFormat format = new TextInputFormat(new Path(filePath));
+ return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
}
/**
* Creates a DataStream that represents the Strings produced by reading the
+ * given file line wise. The file will be read with the system's default
+ * character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g.,
+ * "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @param parallelism
+ * degree of parallelism
+ * @return The DataStream representing the text file.
+ */
+ public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
+ Validate.notNull(filePath, "The file path may not be null.");
+ TextInputFormat format = new TextInputFormat(new Path(filePath));
+ return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO),
+ parallelism);
+
+ }
+
+ /**
+ * Creates a DataStream that represents the Strings produced by reading the
+ * given file line wise. The file will be read with the given
+ * character set.
+ *
+ * @param filePath
+ * The path of the file, as a URI (e.g.,
+ * "file:///some/local/file" or "hdfs://host:port/file/path").
+ * @return The DataStream representing the text file.
+ */
+ public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
+ Validate.notNull(filePath, "The file path may not be null.");
+ TextInputFormat format = new TextInputFormat(new Path(filePath));
+ format.setCharsetName(charsetName);
+ return addSource(new FileSourceFunction(format, BasicTypeInfo.STRING_TYPE_INFO), 1);
+ }
+
+ // public DataStreamSource<StringValue> readTextFileWithValue(String
+ // filePath) {
+ // Validate.notNull(filePath, "The file path may not be null.");
+ // TextValueInputFormat format = new TextValueInputFormat(new
+ // Path(filePath));
+ // return addSource(new FileSourceFunction<StringValue>(format,
+ // new ValueTypeInfo<StringValue>(StringValue.class)), 1);
+ // }
+ //
+ // public DataStreamSource<StringValue> readTextFileWithValue(String
+ // filePath, String charsetName,
+ // boolean skipInvalidLines) {
+ // Validate.notNull(filePath, "The file path may not be null.");
+ // TextValueInputFormat format = new TextValueInputFormat(new
+ // Path(filePath));
+ // format.setCharsetName(charsetName);
+ // format.setSkipInvalidLines(skipInvalidLines);
+ // return addSource(new FileSourceFunction<StringValue>(format,
+ // new ValueTypeInfo<StringValue>(StringValue.class)), 1);
+ // }
+ //
+ // public <X> DataStreamSource<X> readFile(FileInputFormat<X> format, String
+ // filePath) {
+ // if (format == null) {
+ // throw new IllegalArgumentException("InputFormat must not be null.");
+ // }
+ // if (filePath == null) {
+ // throw new IllegalArgumentException("The file path must not be null.");
+ // }
+ //
+ // format.setFilePath(new Path(filePath));
+ // try {
+ // return addSource(
+ // new FileSourceFunction<X>(format,
+ // TypeExtractor.getInputFormatTypes(format)), 1);
+ // } catch (Exception e) {
+ // throw new InvalidProgramException(
+ // "The type returned by the input format could not be automatically determined. "
+ // + "Please specify the TypeInformation of the produced type explicitly.");
+ // }
+ // }
+
+ /**
+ * Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set.
*
@@ -330,9 +415,8 @@ public abstract class StreamExecutionEnvironment {
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
try {
- jobGraphBuilder.addStreamVertex(returnStream.getId(),
- new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
- SerializationUtils.serialize(function), 1);
+ jobGraphBuilder.addSourceVertex(returnStream.getId(), function, null, outTypeInfo,
+ "source", SerializationUtils.serialize(function), getDegreeOfParallelism());
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 0fe7149..879eb3f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -17,32 +17,141 @@
package org.apache.flink.streaming.api.function.source;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.OperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.util.Collector;
-public class FileSourceFunction implements SourceFunction<String> {
+public class FileSourceFunction extends SourceFunction<String> {
private static final long serialVersionUID = 1L;
- private final String path;
+ private InputSplitProvider provider;
- public FileSourceFunction(String path) {
- this.path = path;
+ private InputFormat<String, ?> format;
+
+ private TypeSerializerFactory<String> serializerFactory;
+
+ private UserCodeWrapper<? extends InputFormat<String, ?>> formatWrapper;
+
+ // cancel flag
+ private volatile boolean taskCanceled = false;
+
+ public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
+ this.format = format;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ GenericDataSourceBase<String, ?> source = new GenericDataSourceBase(format,
+ new OperatorInformation<String>(typeInfo), format.toString());
+ formatWrapper = source.getUserCodeWrapper();
+ this.serializerFactory = createSerializer(typeInfo);
+ }
+
+ @Override
+ public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
+ return this.formatWrapper;
+ }
+
+ private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
+ TypeSerializer<String> serializer = typeInfo.createSerializer();
+
+ if (serializer.isStateful()) {
+ return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass());
+ } else {
+ return new RuntimeStatelessSerializerFactory<String>(serializer,
+ typeInfo.getTypeClass());
+ }
}
@Override
- public void invoke(Collector<String> collector) throws IOException {
- BufferedReader br = new BufferedReader(new FileReader(path));
- String line = br.readLine();
- while (line != null) {
- if (!line.equals("")) {
- collector.collect(line);
+ public void invoke(Collector<String> collector) throws Exception {
+ final TypeSerializer<String> serializer = serializerFactory.getSerializer();
+ final Iterator<InputSplit> splitIterator = getInputSplits();
+ @SuppressWarnings("unchecked")
+ final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.format;
+ try {
+ while (!this.taskCanceled && splitIterator.hasNext()) {
+
+ final InputSplit split = splitIterator.next();
+ String record = serializer.createInstance();
+
+ format.open(split);
+ try {
+ while (!this.taskCanceled && !format.reachedEnd()) {
+ if ((record = format.nextRecord(record)) != null) {
+ collector.collect(record);
+ }
+ }
+ } finally {
+ format.close();
+ }
}
- line = br.readLine();
+ collector.close();
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
- br.close();
+ }
+
+ private Iterator<InputSplit> getInputSplits() {
+
+ return new Iterator<InputSplit>() {
+
+ private InputSplit nextSplit;
+
+ private boolean exhausted;
+
+ @Override
+ public boolean hasNext() {
+ if (exhausted) {
+ return false;
+ }
+
+ if (nextSplit != null) {
+ return true;
+ }
+
+ InputSplit split = provider.getNextInputSplit();
+
+ if (split != null) {
+ this.nextSplit = split;
+ return true;
+ } else {
+ exhausted = true;
+ return false;
+ }
+ }
+
+ @Override
+ public InputSplit next() {
+ if (this.nextSplit == null && !hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final InputSplit tmp = this.nextSplit;
+ this.nextSplit = null;
+ return tmp;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public final void initialize(Environment env) {
+ this.provider = env.getInputSplitProvider();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index 7371ac9..18144b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -21,9 +21,11 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.util.Collector;
-public class FileStreamFunction implements SourceFunction<String> {
+public class FileStreamFunction extends SourceFunction<String> {
private static final long serialVersionUID = 1L;
private final String path;
@@ -46,4 +48,9 @@ public class FileStreamFunction implements SourceFunction<String> {
br.close();
}
}
+
+ @Override
+ public UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index cb960dd..89e0823 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -22,7 +22,7 @@ import java.util.Collection;
import org.apache.flink.util.Collector;
-public class FromElementsFunction<T> implements SourceFunction<T> {
+public class FromElementsFunction<T> extends SourceFunction<T> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 69601ff..ece68b2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
* Source Function used to generate the number sequence
*
*/
-public class GenSequenceFunction implements SourceFunction<Long> {
+public class GenSequenceFunction extends SourceFunction<Long> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index d30bbba..a3949e5 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -18,11 +18,19 @@
package org.apache.flink.streaming.api.function.source;
import java.io.Serializable;
-
+
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.util.Collector;
-public interface SourceFunction<OUT> extends Function, Serializable {
-
- public void invoke(Collector<OUT> collector) throws Exception;
+public abstract class SourceFunction<OUT> implements Function, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract void invoke(Collector<OUT> collector) throws Exception;
+
+ public void initialize(Environment env){}
+
+ public abstract UserCodeWrapper<? extends InputFormat<String, ?>> getFormatWrapper();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 8c9df46..0cfe028 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -49,5 +49,10 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Se
@Override
protected void callUserFunction() throws Exception {
}
+
+ @Override
+ public SourceFunction<OUT> getSourceFunction(){
+ return sourceFunction;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 71739c1..ea518a0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
@@ -161,4 +163,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(userFunction, t);
}
+
+ public SourceFunction<OUT> getSourceFunction() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/510a8113/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 2db0d8b..1dd78b5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -76,6 +76,9 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
}
protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
+ if (userInvokable.getSourceFunction() != null) {
+ userInvokable.getSourceFunction().initialize(getEnvironment());
+ }
userInvokable.setRuntimeContext(context);
userInvokable.open(getTaskConfiguration());
userInvokable.invoke();