You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:48 UTC
[15/28] git commit: [streaming] Added exceptions for wrong usage of
StreamExecutionEnvironment
[streaming] Added exceptions for wrong usage of StreamExecutionEnvironment
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/18502111
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/18502111
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/18502111
Branch: refs/heads/master
Commit: 185021112f180419aa7e1704ec60b2c515770876
Parents: f149197
Author: ghermann <re...@gmail.com>
Authored: Wed Aug 27 16:38:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 245 +++++++++----------
.../environment/StreamExecutionEnvironment.java | 39 ++-
.../api/invokable/operator/co/CoInvokable.java | 2 +-
.../flink/streaming/util/ClusterUtil.java | 23 +-
.../streamcomponent/StreamComponentTest.java | 150 ++++++++++--
5 files changed, 291 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ead9c35..3da939c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -93,13 +93,11 @@ public abstract class DataStream<OUT> {
* @param operatorType
* The type of the operator in the component
*/
- public DataStream(StreamExecutionEnvironment environment,
- String operatorType) {
+ public DataStream(StreamExecutionEnvironment environment, String operatorType) {
if (environment == null) {
throw new NullPointerException("context is null");
}
- // TODO add name based on component number an preferable sequential id
counter++;
this.id = operatorType + "-" + counter.toString();
this.environment = environment;
@@ -120,8 +118,7 @@ public abstract class DataStream<OUT> {
this.environment = dataStream.environment;
this.id = dataStream.id;
this.degreeOfParallelism = dataStream.degreeOfParallelism;
- this.userDefinedNames = new ArrayList<String>(
- dataStream.userDefinedNames);
+ this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
@@ -180,8 +177,7 @@ public abstract class DataStream<OUT> {
* @return The {@link ConnectedDataStream}.
*/
public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
- return new ConnectedDataStream<OUT, R>(environment, jobGraphBuilder,
- this, dataStream);
+ return new ConnectedDataStream<OUT, R>(environment, jobGraphBuilder, this, dataStream);
}
/**
@@ -194,8 +190,7 @@ public abstract class DataStream<OUT> {
*/
public DataStream<OUT> partitionBy(int keyPosition) {
if (keyPosition < 0) {
- throw new IllegalArgumentException(
- "The position of the field must be non-negative");
+ throw new IllegalArgumentException("The position of the field must be non-negative");
}
return setConnectionType(new FieldsPartitioner<OUT>(keyPosition));
@@ -257,13 +252,13 @@ public abstract class DataStream<OUT> {
* @return The transformed {@link DataStream}.
*/
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
- mapper, MapFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
- mapper, MapFunction.class, 1);
+ FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(mapper,
+ MapFunction.class, 0);
+ FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(mapper,
+ MapFunction.class, 1);
- return addFunction("map", mapper, inTypeWrapper, outTypeWrapper,
- new MapInvokable<OUT, R>(mapper));
+ return addFunction("map", mapper, inTypeWrapper, outTypeWrapper, new MapInvokable<OUT, R>(
+ mapper));
}
/**
@@ -282,20 +277,20 @@ public abstract class DataStream<OUT> {
* output type
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R, ?> flatMap(
- FlatMapFunction<OUT, R> flatMapper) {
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
- flatMapper, FlatMapFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
- flatMapper, FlatMapFunction.class, 1);
+ public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
+ FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(flatMapper,
+ FlatMapFunction.class, 0);
+ FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(flatMapper,
+ FlatMapFunction.class, 1);
- return addFunction("flatMap", flatMapper, inTypeWrapper,
- outTypeWrapper, new FlatMapInvokable<OUT, R>(flatMapper));
+ return addFunction("flatMap", flatMapper, inTypeWrapper, outTypeWrapper,
+ new FlatMapInvokable<OUT, R>(flatMapper));
}
-
+
/**
- * Applies a reduce transformation on the data stream. The user can also extend the {@link RichReduceFunction} to gain access to other features provided by
- * the {@link RichFuntion} interface.
+ * Applies a reduce transformation on the data stream. The user can also
+ * extend the {@link RichReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
@@ -303,12 +298,13 @@ public abstract class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0),
- new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
+ return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
+ ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
+ ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
}
public GroupedDataStream<OUT> groupBy(int keyPosition) {
- return new GroupedDataStream<OUT>(this, keyPosition);
+ return new GroupedDataStream<OUT>(this, keyPosition);
}
/**
@@ -328,8 +324,8 @@ public abstract class DataStream<OUT> {
* output type
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R, ?> batchReduce(
- GroupReduceFunction<OUT, R> reducer, long batchSize) {
+ public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+ long batchSize) {
return batchReduce(reducer, batchSize, batchSize);
}
@@ -353,16 +349,22 @@ public abstract class DataStream<OUT> {
* output type
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R, ?> batchReduce(
- GroupReduceFunction<OUT, R> reducer, long batchSize, long slideSize) {
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
- reducer, GroupReduceFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
- reducer, GroupReduceFunction.class, 1);
+ public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+ long batchSize, long slideSize) {
+ if (batchSize < 1) {
+ throw new IllegalArgumentException("Batch size must be positive");
+ }
+ if (slideSize < 1) {
+ throw new IllegalArgumentException("Slide size must be positive");
+ }
- return addFunction("batchReduce", reducer, inTypeWrapper,
- outTypeWrapper, new BatchReduceInvokable<OUT, R>(reducer,
- batchSize, slideSize));
+ FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
+ GroupReduceFunction.class, 0);
+ FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
+ GroupReduceFunction.class, 1);
+
+ return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
+ new BatchReduceInvokable<OUT, R>(reducer, batchSize, slideSize));
}
/**
@@ -384,8 +386,8 @@ public abstract class DataStream<OUT> {
* output type
* @return The transformed DataStream.
*/
- public <R> SingleOutputStreamOperator<R, ?> windowReduce(
- GroupReduceFunction<OUT, R> reducer, long windowSize) {
+ public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+ long windowSize) {
return windowReduce(reducer, windowSize, windowSize);
}
@@ -410,17 +412,22 @@ public abstract class DataStream<OUT> {
* output type
* @return The transformed DataStream.
*/
- public <R> SingleOutputStreamOperator<R, ?> windowReduce(
- GroupReduceFunction<OUT, R> reducer, long windowSize,
- long slideInterval) {
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(
- reducer, GroupReduceFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(
- reducer, GroupReduceFunction.class, 1);
+ public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+ long windowSize, long slideInterval) {
+ if (windowSize < 1) {
+ throw new IllegalArgumentException("Window size must be positive");
+ }
+ if (slideInterval < 1) {
+ throw new IllegalArgumentException("Slide interval must be positive");
+ }
- return addFunction("batchReduce", reducer, inTypeWrapper,
- outTypeWrapper, new WindowReduceInvokable<OUT, R>(reducer,
- windowSize, slideInterval));
+ FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(reducer,
+ GroupReduceFunction.class, 0);
+ FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(reducer,
+ GroupReduceFunction.class, 1);
+
+ return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper,
+ new WindowReduceInvokable<OUT, R>(reducer, windowSize, slideInterval));
}
/**
@@ -437,11 +444,11 @@ public abstract class DataStream<OUT> {
* @return The filtered DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
- FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(
- filter, FilterFunction.class, 0);
+ FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
+ FilterFunction.class, 0);
- return addFunction("filter", filter, typeWrapper, typeWrapper,
- new FilterInvokable<OUT>(filter));
+ return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
+ filter));
}
/**
@@ -454,8 +461,7 @@ public abstract class DataStream<OUT> {
public DataStreamSink<OUT> print() {
DataStream<OUT> inputStream = this.copy();
PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
- DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction,
- null);
+ DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
@@ -490,8 +496,7 @@ public abstract class DataStream<OUT> {
* @return The closed DataStream
*/
public DataStreamSink<OUT> writeAsText(String path, long millis) {
- return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis,
- null);
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, null);
}
/**
@@ -509,8 +514,7 @@ public abstract class DataStream<OUT> {
* @return The closed DataStream
*/
public DataStreamSink<OUT> writeAsText(String path, int batchSize) {
- return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize,
- null);
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, null);
}
/**
@@ -531,10 +535,8 @@ public abstract class DataStream<OUT> {
*
* @return The closed DataStream
*/
- public DataStreamSink<OUT> writeAsText(String path, long millis,
- OUT endTuple) {
- return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis,
- endTuple);
+ public DataStreamSink<OUT> writeAsText(String path, long millis, OUT endTuple) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, endTuple);
}
/**
@@ -556,10 +558,8 @@ public abstract class DataStream<OUT> {
*
* @return The closed DataStream
*/
- public DataStreamSink<OUT> writeAsText(String path, int batchSize,
- OUT endTuple) {
- return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize,
- endTuple);
+ public DataStreamSink<OUT> writeAsText(String path, int batchSize, OUT endTuple) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, endTuple);
}
/**
@@ -580,12 +580,10 @@ public abstract class DataStream<OUT> {
*
* @return the data stream constructed
*/
- private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream,
- String path, WriteFormatAsText<OUT> format, long millis,
- OUT endTuple) {
- DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByMillis<OUT>(path, format, millis,
- endTuple), null);
+ private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+ WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
+ DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+ path, format, millis, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
@@ -610,12 +608,10 @@ public abstract class DataStream<OUT> {
*
* @return the data stream constructed
*/
- private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream,
- String path, WriteFormatAsText<OUT> format, int batchSize,
- OUT endTuple) {
+ private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+ WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByBatches<OUT>(path, format, batchSize,
- endTuple), null);
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
@@ -667,8 +663,7 @@ public abstract class DataStream<OUT> {
* @return The closed DataStream
*/
public DataStreamSink<OUT> writeAsCsv(String path, int batchSize) {
- return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize,
- null);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, null);
}
/**
@@ -690,8 +685,7 @@ public abstract class DataStream<OUT> {
* @return The closed DataStream
*/
public DataStreamSink<OUT> writeAsCsv(String path, long millis, OUT endTuple) {
- return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis,
- endTuple);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, endTuple);
}
/**
@@ -713,13 +707,11 @@ public abstract class DataStream<OUT> {
*
* @return The closed DataStream
*/
- public DataStreamSink<OUT> writeAsCsv(String path, int batchSize,
- OUT endTuple) {
+ public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
if (this instanceof SingleOutputStreamOperator) {
((SingleOutputStreamOperator<?, ?>) this).setMutability(false);
}
- return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize,
- endTuple);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
}
/**
@@ -740,11 +732,10 @@ public abstract class DataStream<OUT> {
*
* @return the data stream constructed
*/
- private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream,
- String path, WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
- DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByMillis<OUT>(path, format, millis,
- endTuple));
+ private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+ WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
+ DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+ path, format, millis, endTuple));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
@@ -769,12 +760,10 @@ public abstract class DataStream<OUT> {
*
* @return the data stream constructed
*/
- private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream,
- String path, WriteFormatAsCsv<OUT> format, int batchSize,
- OUT endTuple) {
+ private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+ WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
- new WriteSinkFunctionByBatches<OUT>(path, format, batchSize,
- endTuple), null);
+ new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
@@ -806,14 +795,12 @@ public abstract class DataStream<OUT> {
return new IterativeDataStream<OUT>(this);
}
- protected <R> DataStream<OUT> addIterationSource(String iterationID,
- long waitTime) {
+ protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
- DataStream<R> returnStream = new DataStreamSource<R>(environment,
- "iterationSource");
+ DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
- jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(),
- iterationID, degreeOfParallelism, waitTime);
+ jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+ degreeOfParallelism, waitTime);
return this.copy();
}
@@ -837,17 +824,15 @@ public abstract class DataStream<OUT> {
TypeSerializerWrapper<OUT> inTypeWrapper,
TypeSerializerWrapper<R> outTypeWrapper,
StreamOperatorInvokable<OUT, R> functionInvokable) {
-
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
- SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(
- environment, functionName);
+ SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
+ functionName);
try {
- jobGraphBuilder.addTask(returnStream.getId(), functionInvokable,
- inTypeWrapper, outTypeWrapper, functionName,
- SerializationUtils.serialize((Serializable) function),
- degreeOfParallelism);
+ jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
+ outTypeWrapper, functionName,
+ SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
}
@@ -856,8 +841,7 @@ public abstract class DataStream<OUT> {
if (inputStream instanceof IterativeDataStream) {
IterativeDataStream<OUT> iterativeStream = (IterativeDataStream<OUT>) inputStream;
- returnStream.addIterationSource(
- iterativeStream.iterationID.toString(),
+ returnStream.addIterationSource(iterativeStream.iterationID.toString(),
iterativeStream.waitTime);
}
@@ -871,8 +855,7 @@ public abstract class DataStream<OUT> {
* Partitioner to set.
* @return The modified DataStream.
*/
- protected DataStream<OUT> setConnectionType(
- StreamPartitioner<OUT> partitioner) {
+ protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
DataStream<OUT> returnStream = this.copy();
returnStream.partitioner = partitioner;
@@ -893,18 +876,15 @@ public abstract class DataStream<OUT> {
* @param typeNumber
* Number of the type (used at co-functions)
*/
- protected <X> void connectGraph(DataStream<X> inputStream, String outputID,
- int typeNumber) {
+ protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
if (inputStream instanceof MergedDataStream) {
for (DataStream<X> stream : ((MergedDataStream<X>) inputStream).mergedStreams) {
- jobGraphBuilder.setEdge(stream.getId(), outputID,
- stream.partitioner, typeNumber,
+ jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
inputStream.userDefinedNames);
}
} else {
- jobGraphBuilder.setEdge(inputStream.getId(), outputID,
- inputStream.partitioner, typeNumber,
- inputStream.userDefinedNames);
+ jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
+ typeNumber, inputStream.userDefinedNames);
}
}
@@ -922,23 +902,18 @@ public abstract class DataStream<OUT> {
return addSink(this.copy(), sinkFunction);
}
- private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
- SinkFunction<OUT> sinkFunction) {
- return addSink(inputStream, sinkFunction,
- new FunctionTypeWrapper<OUT>(sinkFunction,
- SinkFunction.class, 0));
+ private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
+ return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT>(sinkFunction,
+ SinkFunction.class, 0));
}
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
- SinkFunction<OUT> sinkFunction,
- TypeSerializerWrapper<OUT> typeWrapper) {
- DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment,
- "sink");
+ SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
+ DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
try {
- jobGraphBuilder.addSink(returnStream.getId(),
- new SinkInvokable<OUT>(sinkFunction), typeWrapper, "sink",
- SerializationUtils.serialize(sinkFunction),
+ jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
+ typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SinkFunction");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5d27786..7d983ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.environment;
+import java.io.File;
import java.io.Serializable;
import java.util.Collection;
@@ -117,6 +118,10 @@ public abstract class StreamExecutionEnvironment {
* The maximum time between two output flushes.
*/
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
+ if (timeoutMillis < 0) {
+ throw new IllegalArgumentException("Timeout of buffer must be non-negative");
+ }
+
this.buffertimeout = timeoutMillis;
return this;
}
@@ -155,10 +160,12 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextFile(String filePath) {
+ checkIfFileExists(filePath);
return addSource(new FileSourceFunction(filePath), 1);
}
public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
+ checkIfFileExists(filePath);
return addSource(new FileSourceFunction(filePath), parallelism);
}
@@ -173,13 +180,31 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the text file.
*/
public DataStreamSource<String> readTextStream(String filePath) {
+ checkIfFileExists(filePath);
return addSource(new FileStreamFunction(filePath), 1);
}
public DataStreamSource<String> readTextStream(String filePath, int parallelism) {
+ checkIfFileExists(filePath);
return addSource(new FileStreamFunction(filePath), parallelism);
}
+
+ private static void checkIfFileExists(String filePath) {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ throw new IllegalArgumentException("File not found: " + filePath);
+ }
+
+ if (!file.canRead()) {
+ throw new IllegalArgumentException("Cannot read file: " + filePath);
+ }
+
+ if (file.isDirectory()) {
+ throw new IllegalArgumentException("Given path is a directory: " + filePath);
+ }
+ }
+
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
@@ -196,6 +221,11 @@ public abstract class StreamExecutionEnvironment {
public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
+ if (data.length == 0) {
+ throw new IllegalArgumentException(
+ "fromElements needs at least one element as argument");
+ }
+
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
@@ -222,8 +252,12 @@ public abstract class StreamExecutionEnvironment {
public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
+ if (data == null) {
+ throw new NullPointerException("Collection must not be null");
+ }
+
if (data.isEmpty()) {
- throw new RuntimeException("Collection must not be empty");
+ throw new IllegalArgumentException("Collection must not be empty");
}
try {
@@ -249,6 +283,9 @@ public abstract class StreamExecutionEnvironment {
* @return A DataStrean, containing all number in the [from, to] interval.
*/
public DataStreamSource<Long> generateSequence(long from, long to) {
+ if (from > to) {
+ throw new IllegalArgumentException("Start of sequence must not be greater than the end");
+ }
return addSource(new GenSequenceFunction(from, to), 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index e9fefc3..c21e784 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -34,7 +34,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
+ private static final Log LOG = LogFactory.getLog(CoInvokable.class);
protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
protected StreamRecord<IN1> reuse1;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 588becf..50a78b3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -23,11 +23,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
public class ClusterUtil {
+
private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
+ public static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
/**
* Executes the given JobGraph locally, on a NepheleMiniCluster
@@ -49,24 +52,32 @@ public class ClusterUtil {
if (LOG.isInfoEnabled()) {
LOG.info("Running on mini cluster");
}
-
+
try {
exec.start();
- Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
+ Client client = new Client(new InetSocketAddress("localhost",
+ exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
client.run(jobGraph, true);
+ } catch (ProgramInvocationException e) {
+ if (e.getMessage().contains("GraphConversionException")) {
+ throw new RuntimeException(CANNOT_EXECUTE_EMPTY_JOB, e);
+ } else {
+ throw new RuntimeException(e.getMessage(), e);
+ }
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e.getMessage(), e);
} finally {
try {
exec.stop();
- } catch (Throwable t) {}
+ } catch (Throwable t) {
+ }
}
}
public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) {
runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
- }
+ }
-}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/18502111/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index b16ffee..b0799cd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -18,28 +18,32 @@
package org.apache.flink.streaming.api.streamcomponent;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.ClusterUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.LogUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class StreamComponentTest {
- @SuppressWarnings("unused")
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALELISM = 1;
- private static final long MEMORYSIZE = 32;
-
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
public static class MySource implements SourceFunction<Tuple1<Integer>> {
@@ -66,19 +70,6 @@ public class StreamComponentTest {
}
}
- // TODO test multiple tasks
- // public static class MyOtherTask extends MapFunction<Tuple1<Integer>,
- // Tuple2<Integer, Integer>> {
- // private static final long serialVersionUID = 1L;
- //
- // @Override
- // public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws
- // Exception {
- // Integer i = value.f0;
- // return new Tuple2<Integer, Integer>(-i - 1, -i - 2);
- // }
- // }
-
public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
@@ -90,19 +81,128 @@ public class StreamComponentTest {
}
}
+ @SuppressWarnings("unused")
+ private static final int PARALLELISM = 1;
+ private static final int SOURCE_PARALELISM = 1;
+ private static final long MEMORYSIZE = 32;
+
+ @Before
+ public void before() {
+ LogUtils.initializeDefaultConsoleLogger(Level.OFF);
+ }
+
+ @Ignore
@Test
- public void runStream() {
+ public void wrongJobGraph() {
+ LocalStreamEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(SOURCE_PARALELISM);
+
+ try {
+ env.execute();
+ fail();
+ } catch (RuntimeException e) {
+ assertEquals(e.getMessage(), ClusterUtil.CANNOT_EXECUTE_EMPTY_JOB);
+ }
+
+ env.fromCollection(Arrays.asList("a", "b"));
+
+ try {
+ env.execute();
+ fail();
+ } catch (RuntimeException e) {
+ System.out.println(e.getMessage());
+ }
+
+ try {
+ env.fromCollection(null);
+ fail();
+ } catch (NullPointerException e) {
+ }
+
+ try {
+ env.fromElements();
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.generateSequence(-10, -30);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.setBufferTimeout(-10);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.setExecutionParallelism(-10);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.readTextFile("random/path/that/is/not/valid");
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ private static class CoMap implements CoMapFunction<String, Long, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map1(String value) {
+ return value;
+ }
+
+ @Override
+ public String map2(Long value) {
+ return value.toString();
+ }
+ }
+
+ static HashSet<String> resultSet;
+ private static class SetSink implements SinkFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(String value) {
+ resultSet.add(value);
+ }
+ }
+
+ @Test
+ public void coTest() {
+ LocalStreamEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(SOURCE_PARALELISM);
+
+ DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
+ DataStream<Long> generatedSequence = env.generateSequence(0, 3);
- LogUtils.initializeDefaultTestConsoleLogger();
+ fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(SOURCE_PARALELISM);
+ resultSet = new HashSet<String>();
+ env.execute();
+
+ HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1", "2", "3"));
+ assertEquals(expectedSet, resultSet);
+ }
+
+ @Test
+ public void runStream() {
+
+ LogUtils.initializeDefaultTestConsoleLogger();
- env
- .addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask())
- .addSink(new MySink());
+ LocalStreamEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(SOURCE_PARALELISM);
+
+ env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
env.executeTest(MEMORYSIZE);
-
+
assertEquals(10, data.keySet().size());
for (Integer k : data.keySet()) {