You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:45 UTC
[02/18] git commit: [streaming] Added TypeInfo to DataStream
[streaming] Added TypeInfo to DataStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4d73f51c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4d73f51c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4d73f51c
Branch: refs/heads/master
Commit: 4d73f51c5acdabac98557fd53933d4b7ad42f98a
Parents: 0c8f1da
Author: mbalassi <ba...@gmail.com>
Authored: Fri Sep 5 14:41:53 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:42:04 2014 +0200
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 21 +-
.../streaming/api/datastream/DataStream.java | 25 +-
.../api/datastream/DataStreamSink.java | 5 +-
.../api/datastream/DataStreamSource.java | 5 +-
.../api/datastream/GroupedDataStream.java | 26 +-
.../api/datastream/IterativeDataStream.java | 2 +-
.../api/datastream/MergedDataStream.java | 6 +-
.../datastream/SingleOutputStreamOperator.java | 292 ++++++++++---------
.../api/datastream/SplitDataStream.java | 14 +-
.../environment/StreamExecutionEnvironment.java | 30 +-
.../api/collector/DirectedOutputTest.java | 19 +-
.../api/invokable/operator/CoFlatMapTest.java | 11 +-
12 files changed, 270 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 4f17ceb..e00919f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.types.TypeInformation;
/**
* The ConnectedDataStream represents a stream for two different data types. It
@@ -58,7 +59,7 @@ public class ConnectedDataStream<IN1, IN2> {
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
this.input1 = input1.copy();
- this.input2 = input2.copy();
+ this.input2 = input2.copy();
}
/**
@@ -80,6 +81,22 @@ public class ConnectedDataStream<IN1, IN2> {
}
/**
+ * Gets the type of the first input
+ * @return The type of the first input
+ */
+ public TypeInformation<IN1> getInputType1() {
+ return input1.getOutputType();
+ }
+
+ /**
+ * Gets the type of the second input
+ * @return The type of the second input
+ */
+ public TypeInformation<IN2> getInputType2() {
+ return input2.getOutputType();
+ }
+
+ /**
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Used for
* applying function on grouped data streams for example
@@ -189,7 +206,7 @@ public class ConnectedDataStream<IN1, IN2> {
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
- environment, functionName);
+ environment, functionName, outTypeWrapper);
try {
input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 9375762..28d07d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -89,6 +89,7 @@ public abstract class DataStream<OUT> {
protected List<String> userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
+ protected TypeSerializerWrapper<OUT> outTypeWrapper;
protected final JobGraphBuilder jobGraphBuilder;
@@ -100,8 +101,11 @@ public abstract class DataStream<OUT> {
* StreamExecutionEnvironment
* @param operatorType
* The type of the operator in the component
+ * @param outTypeWrapper
+ * Type of the output
*/
- public DataStream(StreamExecutionEnvironment environment, String operatorType) {
+ public DataStream(StreamExecutionEnvironment environment, String operatorType,
+ TypeSerializerWrapper<OUT> outTypeWrapper) {
if (environment == null) {
throw new NullPointerException("context is null");
}
@@ -114,7 +118,7 @@ public abstract class DataStream<OUT> {
this.userDefinedNames = new ArrayList<String>();
this.selectAll = false;
this.partitioner = new ForwardPartitioner<OUT>();
-
+ this.outTypeWrapper = outTypeWrapper;
}
/**
@@ -131,7 +135,7 @@ public abstract class DataStream<OUT> {
this.selectAll = dataStream.selectAll;
this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
-
+ this.outTypeWrapper = dataStream.outTypeWrapper;
}
/**
@@ -160,6 +164,15 @@ public abstract class DataStream<OUT> {
}
/**
+ * Gets the output type.
+ *
+ * @return The output type.
+ */
+ public TypeInformation<OUT> getOutputType() {
+ return this.outTypeWrapper.getTypeInfo();
+ }
+
+ /**
* Creates a new {@link MergedDataStream} by merging {@link DataStream}
* outputs of the same type with each other. The DataStreams merged using
* this operator will be transformed simultaneously.
@@ -890,7 +903,7 @@ public abstract class DataStream<OUT> {
protected <R> DataStream<OUT> addIterationSource(String iterationID, long waitTime) {
- DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
+ DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism, waitTime);
@@ -919,7 +932,7 @@ public abstract class DataStream<OUT> {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
- functionName);
+ functionName, outTypeWrapper);
try {
jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
@@ -1001,7 +1014,7 @@ public abstract class DataStream<OUT> {
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
- DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
+ DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", outTypeWrapper);
try {
jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 71e88d8..f51e0e6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* Represents the end of a DataStream.
@@ -27,8 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
*/
public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
- protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) {
- super(environment, operatorType);
+ protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<IN> outTypeWrapper) {
+ super(environment, operatorType, outTypeWrapper);
}
protected DataStreamSink(DataStream<IN> dataStream) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 2a05e27..29389ae 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* The DataStreamSource represents the starting point of a DataStream.
@@ -27,8 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
*/
public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
- public DataStreamSource(StreamExecutionEnvironment environment, String operatorType) {
- super(environment, operatorType);
+ public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
+ super(environment, operatorType, outTypeWrapper);
}
public DataStreamSource(DataStream<OUT> dataStream) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 94d6c8d..2e1ed57 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -31,8 +31,7 @@ import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvoka
import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp;
import org.apache.flink.streaming.api.invokable.util.Timestamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-
-//import org.apache.jasper.compiler.Node.ParamsAction;
+import org.apache.flink.types.TypeInformation;
/**
* A GroupedDataStream represents a data stream which has been partitioned by
@@ -53,12 +52,21 @@ public class GroupedDataStream<OUT> {
}
/**
- * Applies a reduce transformation on the grouped data stream grouped by the
- * given key position. The {@link ReduceFunction} will receive input values
- * based on the key value. Only input values with the same key will go to
- * the same reducer.The user can also extend {@link RichReduceFunction} to
- * gain access to other features provided by the {@link RichFuntion}
- * interface.
+ * Gets the output type.
+ *
+ * @return The output type.
+ */
+ public TypeInformation<OUT> getOutputType() {
+ return dataStream.getOutputType();
+ }
+
+ /**
+ * Applies a reduce transformation on the grouped data stream grouped on by
+ * the given key position. The {@link ReduceFunction} will receive input
+ * values based on the key value. Only input values with the same key will
+ * go to the same reducer.The user can also extend
+ * {@link RichReduceFunction} to gain access to other features provided by
+ * the {@link RichFuntion} interface.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
@@ -70,7 +78,7 @@ public class GroupedDataStream<OUT> {
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
}
-
+
/**
* Applies a group reduce transformation on preset chunks of the grouped
* data stream. The {@link GroupReduceFunction} will receive input values
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 16362ba..573dffd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -80,7 +80,7 @@ public class IterativeDataStream<IN> extends
*
*/
public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
- DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
+ DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
iterationID.toString(), iterationTail.getParallelism(), waitTime);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
index 045af4f..c1618b2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/MergedDataStream.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* The MergedDataStream represents a DataStream which consists of merged outputs
@@ -35,8 +36,9 @@ public class MergedDataStream<OUT> extends DataStream<OUT> {
protected List<DataStream<OUT>> mergedStreams;
- protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType) {
- super(environment, operatorType);
+ protected MergedDataStream(StreamExecutionEnvironment environment, String operatorType,
+ TypeSerializerWrapper<OUT> outTypeWrapper) {
+ super(environment, operatorType, outTypeWrapper);
this.mergedStreams = new ArrayList<DataStream<OUT>>();
this.mergedStreams.add(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 1f01feb..b2fe551 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -1,147 +1,149 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * The SingleOutputStreamOperator represents a user defined transformation
- * applied on a {@link DataStream} with one predefined output type.
- *
- * @param <OUT>
- * Output type of the operator.
- * @param <O>
- * Type of the operator.
- */
-public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
- DataStream<OUT> {
-
- protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, String operatorType) {
- super(environment, operatorType);
- setBufferTimeout(environment.getBufferTimeout());
- }
-
- protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
- super(dataStream);
- }
-
- /**
- * Sets the degree of parallelism for this operator. The degree must be 1 or
- * more.
- *
- * @param dop
- * The degree of parallelism for this operator.
- * @return The operator with set degree of parallelism.
- */
- public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
- if (dop < 1) {
- throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
- }
- this.degreeOfParallelism = dop;
-
- jobGraphBuilder.setParallelism(id, degreeOfParallelism);
-
- return this;
- }
-
- /**
- * Sets the mutability of the operator. If the operator is set to mutable,
- * the tuples received in the user defined functions, will be reused after
- * the function call. Setting an operator to mutable reduces garbage
- * collection overhead and thus increases scalability. Please note that if a
- * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
- * as mutable, the user can only iterate through the iterator once in every
- * invoke.
- *
- * @param isMutable
- * The mutability of the operator.
- * @return The operator with mutability set.
- */
- public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
- jobGraphBuilder.setMutability(id, isMutable);
- return this;
- }
-
- /**
- * Sets the maximum time frequency (ms) for the flushing of the output
- * buffer. By default the output buffers flush only when they are full.
- *
- * @param timeoutMillis
- * The maximum time between two output flushes.
- * @return The operator with buffer timeout set.
- */
- public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
- jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
- return this;
- }
-
- /**
- * Operator used for directing tuples to specific named outputs using an
- * {@link OutputSelector}. Calling this method on an operator creates a new
- * {@link SplitDataStream}.
- *
- * @param outputSelector
- * The user defined {@link OutputSelector} for directing the
- * tuples.
- * @return The {@link SplitDataStream}
- */
- public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
- try {
- jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
-
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize OutputSelector");
- }
-
- return new SplitDataStream<OUT>(this);
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
- return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<OUT, O> broadcast() {
- return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<OUT, O> shuffle() {
- return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<OUT, O> forward() {
- return (SingleOutputStreamOperator<OUT, O>) super.forward();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<OUT, O> distribute() {
- return (SingleOutputStreamOperator<OUT, O>) super.distribute();
- }
-
- @Override
- protected SingleOutputStreamOperator<OUT, O> copy() {
- return new SingleOutputStreamOperator<OUT, O>(this);
- }
-
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * The SingleOutputStreamOperator represents a user defined transformation
+ * applied on a {@link DataStream} with one predefined output type.
+ *
+ * @param <OUT>
+ * Output type of the operator.
+ * @param <O>
+ * Type of the operator.
+ */
+public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
+ DataStream<OUT> {
+
+ protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
+ String operatorType, TypeSerializerWrapper<OUT> outTypeWrapper) {
+ super(environment, operatorType, outTypeWrapper);
+ setBufferTimeout(environment.getBufferTimeout());
+ }
+
+ protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
+ super(dataStream);
+ }
+
+ /**
+ * Sets the degree of parallelism for this operator. The degree must be 1 or
+ * more.
+ *
+ * @param dop
+ * The degree of parallelism for this operator.
+ * @return The operator with set degree of parallelism.
+ */
+ public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
+ if (dop < 1) {
+ throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+ }
+ this.degreeOfParallelism = dop;
+
+ jobGraphBuilder.setParallelism(id, degreeOfParallelism);
+
+ return this;
+ }
+
+ /**
+ * Sets the mutability of the operator. If the operator is set to mutable,
+ * the tuples received in the user defined functions, will be reused after
+ * the function call. Setting an operator to mutable reduces garbage
+ * collection overhead and thus increases scalability. Please note that if a
+ * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
+ * as mutable, the user can only iterate through the iterator once in every
+ * invoke.
+ *
+ * @param isMutable
+ * The mutability of the operator.
+ * @return The operator with mutability set.
+ */
+ public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
+ jobGraphBuilder.setMutability(id, isMutable);
+ return this;
+ }
+
+ /**
+ * Sets the maximum time frequency (ms) for the flushing of the output
+ * buffer. By default the output buffers flush only when they are full.
+ *
+ * @param timeoutMillis
+ * The maximum time between two output flushes.
+ * @return The operator with buffer timeout set.
+ */
+ public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
+ jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
+ return this;
+ }
+
+ /**
+ * Operator used for directing tuples to specific named outputs using an
+ * {@link OutputSelector}. Calling this method on an operator creates a new
+ * {@link SplitDataStream}.
+ *
+ * @param outputSelector
+ * The user defined {@link OutputSelector} for directing the
+ * tuples.
+ * @return The {@link SplitDataStream}
+ */
+ public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+ try {
+ jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize OutputSelector");
+ }
+
+ return new SplitDataStream<OUT>(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
+ return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> broadcast() {
+ return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> shuffle() {
+ return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> forward() {
+ return (SingleOutputStreamOperator<OUT, O>) super.forward();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> distribute() {
+ return (SingleOutputStreamOperator<OUT, O>) super.distribute();
+ }
+
+ @Override
+ protected SingleOutputStreamOperator<OUT, O> copy() {
+ return new SingleOutputStreamOperator<OUT, O>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 0ddb4f0..838f228 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -18,8 +18,9 @@
package org.apache.flink.streaming.api.datastream;
import java.util.Arrays;
-
+
import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.types.TypeInformation;
/**
* The SplitDataStream represents an operator that has been split using an
@@ -36,7 +37,16 @@ public class SplitDataStream<OUT> {
protected SplitDataStream(DataStream<OUT> dataStream) {
this.dataStream = dataStream.copy();
}
-
+
+ /**
+ * Gets the output type.
+ *
+ * @return The output type.
+ */
+ public TypeInformation<OUT> getOutputType() {
+ return dataStream.getOutputType();
+ }
+
/**
* Sets the output names for which the next operator will receive values.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7d983ad..6187c99 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -189,7 +190,6 @@ public abstract class StreamExecutionEnvironment {
return addSource(new FileStreamFunction(filePath), parallelism);
}
-
private static void checkIfFileExists(String filePath) {
File file = new File(filePath);
if (!file.exists()) {
@@ -199,12 +199,12 @@ public abstract class StreamExecutionEnvironment {
if (!file.canRead()) {
throw new IllegalArgumentException("Cannot read file: " + filePath);
}
-
+
if (file.isDirectory()) {
throw new IllegalArgumentException("Given path is a directory: " + filePath);
}
}
-
+
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
@@ -219,18 +219,19 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the elements.
*/
public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
- DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
-
if (data.length == 0) {
throw new IllegalArgumentException(
"fromElements needs at least one element as argument");
}
+ TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+ DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
+ outTypeWrapper);
+
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
- new ObjectTypeWrapper<OUT>(data[0]), "source",
- SerializationUtils.serialize(function), 1);
+ outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
}
@@ -250,8 +251,6 @@ public abstract class StreamExecutionEnvironment {
* @return The DataStream representing the elements.
*/
public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
- DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
-
if (data == null) {
throw new NullPointerException("Collection must not be null");
}
@@ -260,6 +259,11 @@ public abstract class StreamExecutionEnvironment {
throw new IllegalArgumentException("Collection must not be empty");
}
+ TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
+ .next());
+ DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
+ outTypeWrapper);
+
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
@@ -301,12 +305,14 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
- DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source");
+ TypeSerializerWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
+ SourceFunction.class, 0);
+ DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
+ outTypeWrapper);
try {
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
- new FunctionTypeWrapper<OUT>(function, SourceFunction.class, 0), "source",
- SerializationUtils.serialize(function), parallelism);
+ outTypeWrapper, "source", SerializationUtils.serialize(function), parallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index fdf9db3..e9d3994 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertEquals;
@@ -82,7 +99,7 @@ public class DirectedOutputTest {
public void outputSelectorTest() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
+
SplitDataStream<Long> source = env.generateSequence(1, 10).split(new MyOutputSelector());
source.select(EVEN).addSink(new ListSink(EVEN));
source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d73f51c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index e7f12bd..0c4bad1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -72,12 +72,19 @@ public class CoFlatMapTest implements Serializable {
DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-
+
try {
ds1.forward().merge(ds2);
fail();
} catch (RuntimeException e) {
- // good
+ // expected
+ }
+
+ try {
+ env.fromElements(10, 11).connect(ds2);
+ fail();
+ } catch (RuntimeException e) {
+ // expected
}
}
}