You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:16 UTC
[39/51] [abbrv] [streaming] API update with more differentiated
DataStream types and javadoc + several fixes
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
deleted file mode 100644
index 76adf62..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.source.FileSourceFunction;
-import org.apache.flink.streaming.api.function.source.FileStreamFunction;
-import org.apache.flink.streaming.api.function.source.FromElementsFunction;
-import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-
-/**
- * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
- * necessary to construct streaming topologies.
- *
- */
-public abstract class StreamExecutionEnvironment {
-
- /**
- * The environment of the context (local by default, cluster if invoked
- * through command line)
- */
- private static StreamExecutionEnvironment contextEnvironment;
-
- /** flag to disable local executor when using the ContextEnvironment */
- private static boolean allowLocalExecution = true;
-
- private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
-
- private int degreeOfParallelism = 1;
-
- private int executionParallelism = -1;
-
- protected JobGraphBuilder jobGraphBuilder;
-
- // --------------------------------------------------------------------------------------------
- // Constructor and Properties
- // --------------------------------------------------------------------------------------------
-
- /**
- * Constructor for creating StreamExecutionEnvironment
- */
- protected StreamExecutionEnvironment() {
- jobGraphBuilder = new JobGraphBuilder("jobGraph");
- }
-
- public int getExecutionParallelism() {
- return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
- }
-
- /**
- * Gets the degree of parallelism with which operation are executed by
- * default. Operations can individually override this value to use a
- * specific degree of parallelism via {@link DataStream#setParallelism}.
- *
- * @return The degree of parallelism used by operations, unless they
- * override that value.
- */
- public int getDegreeOfParallelism() {
- return this.degreeOfParallelism;
- }
-
- /**
- * Sets the degree of parallelism (DOP) for operations executed through this
- * environment. Setting a DOP of x here will cause all operators (such as
- * map, batchReduce) to run with x parallel instances. This method overrides
- * the default parallelism for this environment. The
- * {@link LocalStreamEnvironment} uses by default a value equal to the
- * number of hardware contexts (CPU cores / threads). When executing the
- * program via the command line client from a JAR file, the default degree
- * of parallelism is the one configured for that setup.
- *
- * @param degreeOfParallelism
- * The degree of parallelism
- */
- protected void setDegreeOfParallelism(int degreeOfParallelism) {
- if (degreeOfParallelism < 1) {
- throw new IllegalArgumentException("Degree of parallelism must be at least one.");
- }
- this.degreeOfParallelism = degreeOfParallelism;
- }
-
- /**
- * Sets the number of hardware contexts (CPU cores / threads) used when
- * executed in {@link LocalStreamEnvironment}.
- *
- * @param degreeOfParallelism
- * The degree of parallelism in local environment
- */
- public void setExecutionParallelism(int degreeOfParallelism) {
- if (degreeOfParallelism < 1) {
- throw new IllegalArgumentException("Degree of parallelism must be at least one.");
- }
-
- this.executionParallelism = degreeOfParallelism;
- }
-
- // --------------------------------------------------------------------------------------------
- // Data stream creations
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a DataStream that represents the Strings produced by reading the
- * given file line wise. The file will be read with the system's default
- * character set.
- *
- * @param filePath
- * The path of the file, as a URI (e.g.,
- * "file:///some/local/file" or "hdfs://host:port/file/path").
- * @return The DataStream representing the text file.
- */
- public DataStream<String> readTextFile(String filePath) {
- return addSource(new FileSourceFunction(filePath), 1);
- }
-
- public DataStream<String> readTextFile(String filePath, int parallelism) {
- return addSource(new FileSourceFunction(filePath), parallelism);
- }
-
- /**
- * Creates a DataStream that represents the Strings produced by reading the
- * given file line wise multiple times(infinite). The file will be read with
- * the system's default character set.
- *
- * @param filePath
- * The path of the file, as a URI (e.g.,
- * "file:///some/local/file" or "hdfs://host:port/file/path").
- * @return The DataStream representing the text file.
- */
- public DataStream<String> readTextStream(String filePath) {
- return addSource(new FileStreamFunction(filePath), 1);
- }
-
- public DataStream<String> readTextStream(String filePath, int parallelism) {
- return addSource(new FileStreamFunction(filePath), parallelism);
- }
-
- /**
- * Creates a new DataStream that contains the given elements. The elements
- * must all be of the same type, for example, all of the String or Integer.
- * The sequence of elements must not be empty. Furthermore, the elements
- * must be serializable (as defined in java.io.Serializable), because the
- * execution environment may ship the elements into the cluster.
- *
- * @param data
- * The collection of elements to create the DataStream from.
- * @param <OUT>
- * type of the returned stream
- * @return The DataStream representing the elements.
- */
- public <OUT extends Serializable> DataStream<OUT> fromElements(OUT... data) {
- DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");
-
- try {
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
- jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
- new ObjectTypeWrapper<OUT, Tuple, OUT>(data[0], null, data[0]), "source",
- SerializationUtils.serialize(function), 1);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize elements");
- }
- return returnStream;
- }
-
- /**
- * Creates a DataStream from the given non-empty collection. The type of the
- * DataStream is that of the elements in the collection. The elements need
- * to be serializable (as defined by java.io.Serializable), because the
- * framework may move the elements into the cluster if needed.
- *
- * @param data
- * The collection of elements to create the DataStream from.
- * @param <OUT>
- * type of the returned stream
- * @return The DataStream representing the elements.
- */
- @SuppressWarnings("unchecked")
- public <OUT extends Serializable> DataStream<OUT> fromCollection(Collection<OUT> data) {
- DataStream<OUT> returnStream = new DataStream<OUT>(this, "elements");
-
- if (data.isEmpty()) {
- throw new RuntimeException("Collection must not be empty");
- }
-
- try {
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-
- jobGraphBuilder.addSource(
- returnStream.getId(),
- new SourceInvokable<OUT>(new FromElementsFunction<OUT>(data)),
- new ObjectTypeWrapper<OUT, Tuple, OUT>((OUT) data.toArray()[0], null, (OUT) data
- .toArray()[0]), "source", SerializationUtils.serialize(function), 1);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize collection");
- }
-
- return returnStream;
- }
-
- /**
- * Creates a new DataStream that contains a sequence of numbers.
- *
- * @param from
- * The number to start at (inclusive).
- * @param to
- * The number to stop at (inclusive)
- * @return A DataStrean, containing all number in the [from, to] interval.
- */
- public DataStream<Long> generateSequence(long from, long to) {
- return addSource(new GenSequenceFunction(from, to), 1);
- }
-
- /**
- * Ads a data source thus opening a {@link DataStream}.
- *
- * @param function
- * the user defined function
- * @param parallelism
- * number of parallel instances of the function
- * @param <OUT>
- * type of the returned stream
- * @return the data stream constructed
- */
- public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
- DataStream<OUT> returnStream = new DataStream<OUT>(this, "source");
-
- try {
- jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
- new FunctionTypeWrapper<OUT, Tuple, OUT>(function, SourceFunction.class, 0, -1, 0),
- "source", SerializationUtils.serialize(function), parallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize SourceFunction");
- }
-
- return returnStream;
- }
-
- public <OUT> DataStream<OUT> addSource(SourceFunction<OUT> sourceFunction) {
- return addSource(sourceFunction, 1);
- }
-
- // --------------------------------------------------------------------------------------------
- // Instantiation of Execution Contexts
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates an execution environment that represents the context in which the
- * program is currently executed. If the program is invoked standalone, this
- * method returns a local execution environment, as returned by
- * {@link #createLocalEnvironment()}.
- *
- * @return The execution environment of the context in which the program is
- * executed.
- */
- public static StreamExecutionEnvironment getExecutionEnvironment() {
- return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
- }
-
- /**
- * Creates a {@link LocalStreamEnvironment}. The local execution environment
- * will run the program in a multi-threaded fashion in the same JVM as the
- * environment was created in. The default degree of parallelism of the
- * local environment is the number of hardware contexts (CPU cores /
- * threads), unless it was specified differently by
- * {@link #setDegreeOfParallelism(int)}.
- *
- * @return A local execution environment.
- */
- public static LocalStreamEnvironment createLocalEnvironment() {
- return createLocalEnvironment(defaultLocalDop);
- }
-
- /**
- * Creates a {@link LocalStreamEnvironment}. The local execution environment
- * will run the program in a multi-threaded fashion in the same JVM as the
- * environment was created in. It will use the degree of parallelism
- * specified in the parameter.
- *
- * @param degreeOfParallelism
- * The degree of parallelism for the local environment.
- * @return A local execution environment with the specified degree of
- * parallelism.
- */
- public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
- LocalStreamEnvironment lee = new LocalStreamEnvironment();
- lee.setDegreeOfParallelism(degreeOfParallelism);
- return lee;
- }
-
- // TODO:fix cluster default parallelism
- /**
- * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
- * (parts of) the program to a cluster for execution. Note that all file
- * paths used in the program must be accessible from the cluster. The
- * execution will use no parallelism, unless the parallelism is set
- * explicitly via {@link #setDegreeOfParallelism}.
- *
- * @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
- * @param port
- * The port of the master (JobManager), where the program should
- * be executed.
- * @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
- * @return A remote environment that executes the program on a cluster.
- */
- public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
- String... jarFiles) {
- return new RemoteStreamEnvironment(host, port, jarFiles);
- }
-
- /**
- * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
- * (parts of) the program to a cluster for execution. Note that all file
- * paths used in the program must be accessible from the cluster. The
- * execution will use the specified degree of parallelism.
- *
- * @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
- * @param port
- * The port of the master (JobManager), where the program should
- * be executed.
- * @param degreeOfParallelism
- * The degree of parallelism to use during the execution.
- * @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
- * @return A remote environment that executes the program on a cluster.
- */
- public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
- int degreeOfParallelism, String... jarFiles) {
- RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles);
- rec.setDegreeOfParallelism(degreeOfParallelism);
- return rec;
- }
-
- // --------------------------------------------------------------------------------------------
- // Methods to control the context and local environments for execution from
- // packaged programs
- // --------------------------------------------------------------------------------------------
-
- protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
- contextEnvironment = ctx;
- }
-
- protected static boolean isContextEnvironmentSet() {
- return contextEnvironment != null;
- }
-
- protected static void disableLocalExecution() {
- allowLocalExecution = false;
- }
-
- public static boolean localExecutionIsAllowed() {
- return allowLocalExecution;
- }
-
- /**
- * Triggers the program execution. The environment will execute all parts of
- * the program that have resulted in a "sink" operation. Sink operations are
- * for example printing results or forwarding them to a message queue.
- * <p>
- * The program execution will be logged and displayed with a generated
- * default name.
- **/
- public abstract void execute();
-
- /**
- * Getter of the {@link JobGraphBuilder} of the streaming job.
- *
- * @return jobgraph
- */
- public JobGraphBuilder getJobGraphBuilder() {
- return jobGraphBuilder;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
deleted file mode 100755
index a39823c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The StreamOperator represents a {@link DataStream} transformed with some user
- * defined operator.
- *
- * @param <OUT>
- * Output Type of the operator.
- */
-public class StreamOperator<OUT> extends DataStream<OUT> {
-
- protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) {
- super(environment, operatorType);
- }
-
- protected StreamOperator(DataStream<OUT> dataStream) {
- super(dataStream);
- }
-
- @Override
- protected DataStream<OUT> copy() {
- return new StreamOperator<OUT>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java
deleted file mode 100755
index a459dbf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TwoInputStreamOperator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The TwoInputStreamOperator represents a {@link StreamOperator} with two
- * inputs.
- *
- * @param <IN1>
- * Type of the first input.
- *
- * @param <IN2>
- * Type of the second input.
- * @param <OUT>
- * Output Type of the operator.
- */
-public class TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
-
- protected TwoInputStreamOperator(StreamExecutionEnvironment environment, String operatorType) {
- super(environment, operatorType);
- }
-
- protected TwoInputStreamOperator(DataStream<OUT> dataStream) {
- super(dataStream);
- }
-
- @Override
- protected DataStream<OUT> copy() {
- return new TwoInputStreamOperator<IN1, IN2, OUT>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 73a5749..ced3de7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -19,7 +19,9 @@
package org.apache.flink.streaming.api.collector;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,6 +41,7 @@ public class DirectedStreamCollector<T> extends StreamCollector<T> {
OutputSelector<T> outputSelector;
private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
+ private List<RecordWriter<SerializationDelegate<StreamRecord<T>>>> emitted;
/**
* Creates a new DirectedStreamCollector
@@ -55,6 +58,7 @@ public class DirectedStreamCollector<T> extends StreamCollector<T> {
OutputSelector<T> outputSelector) {
super(channelID, serializationDelegate);
this.outputSelector = outputSelector;
+ this.emitted = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
}
@@ -82,11 +86,14 @@ public class DirectedStreamCollector<T> extends StreamCollector<T> {
Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
streamRecord.setId(channelID);
serializationDelegate.setInstance(streamRecord);
+ emitted.clear();
for (String outputName : outputNames) {
try {
- for (RecordWriter<SerializationDelegate<StreamRecord<T>>> output : outputMap
- .get(outputName)) {
+ RecordWriter<SerializationDelegate<StreamRecord<T>>> output = outputMap
+ .get(outputName);
+ if (!emitted.contains(output)) {
output.emit(serializationDelegate);
+ emitted.add(output);
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 798d8fa..17d7e7b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -23,13 +23,16 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+
/**
- * Class for defining an OutputSelector for the directTo operator. Every output
- * object of a directed DataStream will run through this operator to select
- * outputs.
+ * Class for defining an OutputSelector for a {@link SplitDataStream} using the
+ * {@link SingleOutputStreamOperator#split} call. Every output object of a
+ * {@link SplitDataStream} will run through this operator to select outputs.
*
* @param <T>
- * Type parameter of the directed tuples/objects.
+ * Type parameter of the split values.
*/
public abstract class OutputSelector<T> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -48,8 +51,9 @@ public abstract class OutputSelector<T> implements Serializable {
/**
* Method for selecting output names for the emitted objects when using the
- * directTo operator. The tuple will be emitted only to output names which
- * are added to the outputs collection.
+ * {@link SingleOutputStreamOperator#split} method. The values will be
+ * emitted only to output names which are added to the outputs collection.
+ * The outputs collection is cleared automatically after each select call.
*
* @param value
* Output object for which the output selection should be made.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 4317f75..20c3b78 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.StringUtils;
/**
* Collector for tuples in Apache Flink stream processing. The collected
- * tuples/obecjts will be wrapped with ID in a {@link StreamRecord} and then
+ * values will be wrapped with ID in a {@link StreamRecord} and then
* emitted to the outputs.
*
* @param <T>
@@ -47,7 +47,7 @@ public class StreamCollector<T> implements Collector<T> {
protected StreamRecord<T> streamRecord;
protected int channelID;
private List<RecordWriter<SerializationDelegate<StreamRecord<T>>>> outputs;
- protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<T>>>>> outputMap;
+ protected Map<String, RecordWriter<SerializationDelegate<StreamRecord<T>>>> outputMap;
protected SerializationDelegate<StreamRecord<T>> serializationDelegate;
/**
@@ -65,7 +65,7 @@ public class StreamCollector<T> implements Collector<T> {
this.streamRecord = new StreamRecord<T>();
this.channelID = channelID;
this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
- this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<T>>>>>();
+ this.outputMap = new HashMap<String, RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
}
/**
@@ -73,21 +73,19 @@ public class StreamCollector<T> implements Collector<T> {
*
* @param output
* The RecordWriter object representing the output.
- * @param outputName
- * User defined name of the output.
+ * @param outputNames
+ * User defined names of the output.
*/
public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<T>>> output,
- String outputName) {
+ List<String> outputNames) {
outputs.add(output);
- if (outputName != null) {
- if (outputMap.containsKey(outputName)) {
- outputMap.get(outputName).add(output);
- } else {
- outputMap.put(outputName,
- new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>());
- outputMap.get(outputName).add(output);
- }
+ for (String outputName : outputNames) {
+ if (outputName != null) {
+ if (!outputMap.containsKey(outputName)) {
+ outputMap.put(outputName, output);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
new file mode 100755
index 0000000..c6cb8af
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * The CoDataStream represents a stream for two different data types. It can be
+ * used to apply transformations like {@link CoMapFunction} on two
+ * {@link DataStream}s
+ *
+ * @param <IN1>
+ * Type of the first DataSteam.
+ * @param <IN2>
+ * Type of the second DataStream.
+ */
+public class CoDataStream<IN1, IN2> {
+
+ StreamExecutionEnvironment environment;
+ JobGraphBuilder jobGraphBuilder;
+ DataStream<IN1> input1;
+ DataStream<IN2> input2;
+
+ protected CoDataStream(StreamExecutionEnvironment environment, JobGraphBuilder jobGraphBuilder,
+ DataStream<IN1> input1, DataStream<IN2> input2) {
+ this.jobGraphBuilder = jobGraphBuilder;
+ this.environment = environment;
+ this.input1 = input1.copy();
+ this.input2 = input2.copy();
+ }
+
+ /**
+ * Returns the first {@link DataStream}.
+ *
+ * @return The first DataStream.
+ */
+ public DataStream<IN1> getFirst() {
+ return input1.copy();
+ }
+
+ /**
+ * Returns the second {@link DataStream}.
+ *
+ * @return The second DataStream.
+ */
+ public DataStream<IN2> getSecond() {
+ return input2.copy();
+ }
+
+ /**
+ * Applies a CoMap transformation on two separate {@link DataStream}s. The
+ * transformation calls a {@link CoMapFunction#map1} for each element
+ * of the first input and {@link CoMapFunction#map2} for each element
+ * of the second input. Each CoMapFunction call returns exactly one element.
+ * The user can also extend {@link RichCoMapFunction} to gain access to
+ * other features provided by the {@link RichFuntion} interface.
+ *
+ * @param coMapper
+ * The CoMapFunction used to jointly transform the two input
+ * DataStreams
+ * @return The transformed DataStream
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
+ return addCoFunction("coMap", coMapper, new FunctionTypeWrapper<IN1, IN2, OUT>(coMapper,
+ CoMapFunction.class, 0, 1, 2), new CoMapInvokable<IN1, IN2, OUT>(coMapper));
+ }
+
+ protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
+ final Function function, TypeSerializerWrapper<IN1, IN2, OUT> typeWrapper,
+ CoInvokable<IN1, IN2, OUT> functionInvokable) {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
+ environment, functionName);
+
+ try {
+ input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
+ functionName, SerializationUtils.serialize((Serializable) function),
+ environment.getDegreeOfParallelism());
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ input1.connectGraph(input1, returnStream.getId(), 1);
+ input1.connectGraph(input2, returnStream.getId(), 2);
+
+ if ((input1.userDefinedName != null) && (input2.userDefinedName != null)) {
+ throw new RuntimeException("An operator cannot have two names");
+ } else {
+ if (input1.userDefinedName != null) {
+ returnStream.name(input1.getUserDefinedNames());
+ }
+
+ if (input2.userDefinedName != null) {
+ returnStream.name(input2.getUserDefinedNames());
+ }
+ }
+ // TODO consider iteration
+
+ return returnStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
new file mode 100755
index 0000000..d17990c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+
+/**
+ * The ConnectedDataStream represents a DataStream which consists of connected
+ * outputs of DataStreams of the same type. Operators applied on this will
+ * transform all the connected outputs jointly.
+ *
+ * @param <OUT>
+ * Type of the output.
+ */
+public class ConnectedDataStream<OUT> extends DataStream<OUT> {
+
+ protected List<DataStream<OUT>> connectedStreams;
+
+ protected ConnectedDataStream(StreamExecutionEnvironment environment, String operatorType) {
+ super(environment, operatorType);
+ this.connectedStreams = new ArrayList<DataStream<OUT>>();
+ this.connectedStreams.add(this);
+ }
+
+ protected ConnectedDataStream(DataStream<OUT> dataStream) {
+ super(dataStream);
+ connectedStreams = new ArrayList<DataStream<OUT>>();
+ if (dataStream instanceof ConnectedDataStream) {
+ for (DataStream<OUT> stream : ((ConnectedDataStream<OUT>) dataStream).connectedStreams) {
+ connectedStreams.add(stream);
+ }
+ } else {
+ this.connectedStreams.add(this);
+ }
+
+ }
+
+ // @Override
+ // public IterativeDataStream<OUT> iterate() {
+ // throw new RuntimeException("Cannot iterate connected DataStreams");
+ // }
+
+ protected void addConnection(DataStream<OUT> stream) {
+ if ((stream.userDefinedName != null) || (this.userDefinedName != null)) {
+ if (!this.userDefinedName.equals(stream.userDefinedName)) {
+ throw new RuntimeException("Error: Connected NamedDataStreams must have same names");
+ }
+ }
+ connectedStreams.add(stream.copy());
+ }
+
+ @Override
+ protected List<String> getUserDefinedNames() {
+ List<String> nameList = new ArrayList<String>();
+ for (DataStream<OUT> stream : connectedStreams) {
+ nameList.add(stream.userDefinedName);
+ }
+ return nameList;
+ }
+
+ @Override
+ protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+ ConnectedDataStream<OUT> returnStream = (ConnectedDataStream<OUT>) this.copy();
+
+ for (DataStream<OUT> stream : returnStream.connectedStreams) {
+ stream.partitioner = partitioner;
+ }
+
+ return returnStream;
+ }
+
+ @Override
+ protected ConnectedDataStream<OUT> copy() {
+ return new ConnectedDataStream<OUT>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
new file mode 100644
index 0000000..b692984
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -0,0 +1,852 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+
+/**
+ * A DataStream represents a stream of elements of the same type. A DataStream
+ * can be transformed into another DataStream by applying a transformation as
+ * for example
+ * <ul>
+ * <li>{@link DataStream#map},</li>
+ * <li>{@link DataStream#filter}, or</li>
+ * <li>{@link DataStream#batchReduce}.</li>
+ * </ul>
+ *
+ * @param <OUT>
+ * The type of the DataStream, i.e., the type of the elements of the
+ * DataStream.
+ */
+public abstract class DataStream<OUT> {
+
+ protected static Integer counter = 0;
+ protected final StreamExecutionEnvironment environment;
+ protected final String id;
+ protected int degreeOfParallelism;
+ protected String userDefinedName;
+ protected StreamPartitioner<OUT> partitioner;
+
+ protected final JobGraphBuilder jobGraphBuilder;
+
+ /**
+ * Create a new {@link DataStream} in the given execution environment with
+ * partitioning set to forward by default.
+ *
+ * @param environment
+ * StreamExecutionEnvironment
+ * @param operatorType
+ * The type of the operator in the component
+ */
+ public DataStream(StreamExecutionEnvironment environment, String operatorType) {
+ if (environment == null) {
+ throw new NullPointerException("context is null");
+ }
+
+ // TODO add name based on component number an preferable sequential id
+ counter++;
+ this.id = operatorType + "-" + counter.toString();
+ this.environment = environment;
+ this.degreeOfParallelism = environment.getDegreeOfParallelism();
+ this.jobGraphBuilder = environment.getJobGraphBuilder();
+ this.partitioner = new ForwardPartitioner<OUT>();
+
+ }
+
+ /**
+ * Create a new DataStream by creating a copy of another DataStream
+ *
+ * @param dataStream
+ * The DataStream that will be copied.
+ */
+ public DataStream(DataStream<OUT> dataStream) {
+ this.environment = dataStream.environment;
+ this.id = dataStream.id;
+ this.degreeOfParallelism = dataStream.degreeOfParallelism;
+ this.userDefinedName = dataStream.userDefinedName;
+ this.partitioner = dataStream.partitioner;
+ this.jobGraphBuilder = dataStream.jobGraphBuilder;
+
+ }
+
+ /**
+ * Partitioning strategy on the stream.
+ */
+ public static enum ConnectionType {
+ SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
+ }
+
+ /**
+ * Returns the ID of the {@link DataStream}.
+ *
+ * @return ID of the DataStream
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Gets the degree of parallelism for this operator.
+ *
+ * @return The parallelism set for this operator.
+ */
+ public int getParallelism() {
+ return this.degreeOfParallelism;
+ }
+
+ /**
+ * Creates a new by connecting {@link DataStream} outputs of the same type
+ * with each other. The DataStreams connected using this operator will be
+ * transformed simultaneously.
+ *
+ * @param streams
+ * The DataStreams to connect output with.
+ * @return The {@link ConnectedDataStream}.
+ */
+ public ConnectedDataStream<OUT> connectWith(DataStream<OUT>... streams) {
+ ConnectedDataStream<OUT> returnStream = new ConnectedDataStream<OUT>(this);
+
+ for (DataStream<OUT> stream : streams) {
+ returnStream.addConnection(stream);
+ }
+ return returnStream;
+ }
+
+ /**
+ * Creates a new {@link CoDataStream} bye connecting {@link DataStream}
+ * outputs of different type with each other. The DataStreams connected
+ * using this operators can be used with CoFunctions.
+ *
+ * @param dataStream
+ * The DataStream with which this stream will be joined.
+ * @return The {@link CoDataStream}.
+ */
+ public <R> CoDataStream<OUT, R> co(DataStream<R> dataStream) {
+ return new CoDataStream<OUT, R>(environment, jobGraphBuilder, this, dataStream);
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are partitioned by their hashcode and are sent to only one component.
+ *
+ * @param keyposition
+ * The field used to compute the hashcode.
+ * @return The DataStream with field partitioning set.
+ */
+ public DataStream<OUT> partitionBy(int keyposition) {
+ if (keyposition < 0) {
+ throw new IllegalArgumentException("The position of the field must be non-negative");
+ }
+
+ return setConnectionType(new FieldsPartitioner<OUT>(keyposition));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are broadcasted to every parallel instance of the next component.
+ *
+ * @return The DataStream with broadcast partitioning set.
+ */
+ public DataStream<OUT> broadcast() {
+ return setConnectionType(new BroadcastPartitioner<OUT>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are shuffled to the next component.
+ *
+ * @return The DataStream with shuffle partitioning set.
+ */
+ public DataStream<OUT> shuffle() {
+ return setConnectionType(new ShufflePartitioner<OUT>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are forwarded to the local subtask of the next component. This is the
+ * default partitioner setting.
+ *
+ * @return The DataStream with shuffle partitioning set.
+ */
+ public DataStream<OUT> forward() {
+ return setConnectionType(new ForwardPartitioner<OUT>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are distributed evenly to the next component.
+ *
+ * @return The DataStream with shuffle partitioning set.
+ */
+ public DataStream<OUT> distribute() {
+ return setConnectionType(new DistributePartitioner<OUT>());
+ }
+
+ /**
+ * Applies a Map transformation on a {@link DataStream}. The transformation
+ * calls a {@link MapFunction} for each element of the DataStream. Each
+ * MapFunction call returns exactly one element. The user can also extend
+ * {@link RichMapFunction} to gain access to other features provided by the
+ * {@link RichFuntion} interface.
+ *
+ * @param mapper
+ * The MapFunction that is called for each element of the
+ * DataStream.
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
+ return addFunction("map", mapper, new FunctionTypeWrapper<OUT, Tuple, R>(mapper,
+ MapFunction.class, 0, -1, 1), new MapInvokable<OUT, R>(mapper));
+ }
+
+ /**
+ * Applies a FlatMap transformation on a {@link DataStream}. The
+ * transformation calls a {@link FlatMapFunction} for each element of the
+ * DataStream. Each FlatMapFunction call can return any number of elements
+ * including none. The user can also extend {@link RichFlatMapFunction} to
+ * gain access to other features provided by the {@link RichFuntion}
+ * interface.
+ *
+ * @param flatMapper
+ * The FlatMapFunction that is called for each element of the
+ * DataStream
+ *
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
+ return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<OUT, Tuple, R>(
+ flatMapper, FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<OUT, R>(
+ flatMapper));
+ }
+
+ /**
+ * Applies a reduce transformation on preset chunks of the DataStream. The
+ * transformation calls a {@link GroupReduceFunction} for each tuple batch
+ * of the predefined size. Each GroupReduceFunction call can return any
+ * number of elements including none. The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
+ *
+ *
+ * @param reducer
+ * The GroupReduceFunction that is called for each tuple batch.
+ * @param batchSize
+ * The number of tuples grouped together in the batch.
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> batchReduce(GroupReduceFunction<OUT, R> reducer,
+ int batchSize) {
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<OUT, R>(reducer,
+ batchSize));
+ }
+
+ /**
+ * Applies a reduce transformation on preset "time" chunks of the
+ * DataStream. The transformation calls a {@link GroupReduceFunction} on
+ * records received during the predefined time window. The window shifted
+ * after each reduce call. Each GroupReduceFunction call can return any
+ * number of elements including none.The user can also extend
+ * {@link RichGroupReduceFunction} to gain access to other features provided
+ * by the {@link RichFuntion} interface.
+ *
+ *
+ * @param reducer
+ * The GroupReduceFunction that is called for each time window.
+ * @param windowSize
+ * The time window to run the reducer on, in milliseconds.
+ * @param <R>
+ * output type
+ * @return The transformed DataStream.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> windowReduce(GroupReduceFunction<OUT, R> reducer,
+ long windowSize) {
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT, Tuple, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<OUT, R>(reducer,
+ windowSize));
+ }
+
+ /**
+ * Applies a Filter transformation on a {@link DataStream}. The
+ * transformation calls a {@link FilterFunction} for each element of the
+ * DataStream and retains only those element for which the function returns
+ * true. Elements for which the function returns false are filtered. The
+ * user can also extend {@link RichFilterFunction} to gain access to other
+ * features provided by the {@link RichFuntion} interface.
+ *
+ * @param filter
+ * The FilterFunction that is called for each element of the
+ * DataSet.
+ * @return The filtered DataStream.
+ */
+ public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
+ return addFunction("filter", filter, new FunctionTypeWrapper<OUT, Tuple, OUT>(filter,
+ FilterFunction.class, 0, -1, 0), new FilterInvokable<OUT>(filter));
+ }
+
+ /**
+ * Writes a DataStream to the standard output stream (stdout). For each
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @return The closed DataStream.
+ */
+ public DataStream<OUT> print() {
+ DataStream<OUT> inputStream = this.copy();
+ PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
+ DataStream<OUT> returnStream = addSink(inputStream, printFunction, null);
+
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+
+ return returnStream;
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsText(String path) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), 1, null);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsText(String path, long millis) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, null);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsText(String path, int batchSize) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, null);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * system time.
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsText(String path, long millis, OUT endTuple) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), millis, endTuple);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * batchSize.
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsText(String path, int batchSize, OUT endTuple) {
+ return writeAsText(this, path, new WriteFormatAsText<OUT>(), batchSize, endTuple);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * system time.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+ WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
+ DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+ path, format, millis, endTuple), null);
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * batchSize.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<OUT> writeAsText(DataStream<OUT> inputStream, String path,
+ WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
+ DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
+ path, format, batchSize, endTuple), null);
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsCsv(String path) {
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), 1, null);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsCsv(String path, long millis) {
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, null);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsCsv(String path, int batchSize) {
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, null);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * system time.
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsCsv(String path, long millis, OUT endTuple) {
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), millis, endTuple);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * batchSize.
+ *
+ * @return The closed DataStream
+ */
+ public DataStream<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
+ if (this instanceof SingleOutputStreamOperator) {
+ ((SingleOutputStreamOperator<?, ?>) this).setMutability(false);
+ }
+ return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * system time.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+ WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
+ DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
+ path, format, millis, endTuple));
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * batchSize.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
+ WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
+ DataStream<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<OUT>(
+ path, format, batchSize, endTuple), null);
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
+ * Initiates an iterative part of the program that executes multiple times
+ * and feeds back data streams. The iterative part needs to be closed by
+ * calling {@link IterativeDataStream#closeWith(DataStream)}. The
+ * transformation of this IterativeDataStream will be the iteration head.
+ * The data stream given to the {@code closeWith(DataStream)} method is the
+ * data stream that will be fed back and used as the input for the iteration
+ * head. Unlike in batch processing by default the output of the iteration
+ * stream is directed to both to the iteration head and the next component.
+ * To direct tuples to the iteration head or the output specifically one can
+ * use the {@code split(OutputSelector)} on the iteration tail while
+ * referencing the iteration head as 'iterate'.
+ *
+ * The iteration edge will be partitioned the same way as the first input of
+ * the iteration head.
+ *
+ * @return The iterative data stream created.
+ */
+ public IterativeDataStream<OUT> iterate() {
+ return new IterativeDataStream<OUT>(this);
+ }
+
+ protected <R> DataStream<OUT> addIterationSource(String iterationID) {
+
+ DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource");
+
+ jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+ degreeOfParallelism);
+
+ return this.copy();
+ }
+
+ /**
+ * Internal function for passing the user defined functions to the JobGraph
+ * of the job.
+ *
+ * @param functionName
+ * name of the function
+ * @param function
+ * the user defined function
+ * @param functionInvokable
+ * the wrapping JobVertex instance
+ * @param <R>
+ * type of the return stream
+ * @return the data stream constructed
+ */
+ private <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
+ final Function function, TypeSerializerWrapper<OUT, Tuple, R> typeWrapper,
+ UserTaskInvokable<OUT, R> functionInvokable) {
+
+ DataStream<OUT> inputStream = this.copy();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
+ functionName);
+
+ try {
+ jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
+ functionName, SerializationUtils.serialize((Serializable) function),
+ degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ connectGraph(inputStream, returnStream.getId(), 0);
+
+ if (inputStream instanceof IterativeDataStream) {
+ returnStream.addIterationSource(((IterativeDataStream<OUT>) inputStream).iterationID
+ .toString());
+ }
+
+ if (userDefinedName != null) {
+ returnStream.name(getUserDefinedNames());
+ }
+
+ return returnStream;
+ }
+
+ protected List<String> getUserDefinedNames() {
+ List<String> nameList = new ArrayList<String>();
+ nameList.add(userDefinedName);
+ return nameList;
+ }
+
+ /**
+ * Gives the data transformation(vertex) a user defined name in order to use
+ * with directed outputs. The {@link OutputSelector} of the input vertex
+ * should use this name for directed emits.
+ *
+ * @param name
+ * The name to set
+ * @return The named DataStream.
+ */
+ protected DataStream<OUT> name(List<String> name) {
+
+ userDefinedName = name.get(0);
+ jobGraphBuilder.setUserDefinedName(id, name);
+
+ return this;
+ }
+
+ /**
+ * Internal function for setting the partitioner for the DataStream
+ *
+ * @param partitioner
+ * Partitioner to set.
+ * @return The modified DataStream.
+ */
+ protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner) {
+ DataStream<OUT> returnStream = this.copy();
+
+ returnStream.partitioner = partitioner;
+
+ return returnStream;
+ }
+
+ /**
+ * Internal function for assembling the underlying
+ * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
+ * the outputs of the given input stream to the specified output stream
+ * given by the outputID.
+ *
+ * @param inputStream
+ * input data stream
+ * @param outputID
+ * ID of the output
+ * @param typeNumber
+ * Number of the type (used at co-functions)
+ */
+ protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+ if (inputStream instanceof ConnectedDataStream) {
+ for (DataStream<X> stream : ((ConnectedDataStream<X>) inputStream).connectedStreams) {
+ jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber);
+ }
+ } else {
+ jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
+ typeNumber);
+ }
+
+ }
+
+ /**
+ * Adds the given sink to this DataStream. Only streams with sinks added
+ * will be executed once the {@link StreamExecutionEnvironment#execute()}
+ * method is called.
+ *
+ * @param sinkFunction
+ * The object containing the sink's invoke function.
+ * @return The closed DataStream.
+ */
+ public DataStream<OUT> addSink(SinkFunction<OUT> sinkFunction) {
+ return addSink(this.copy(), sinkFunction);
+ }
+
+ private DataStream<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
+ return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT, Tuple, OUT>(
+ sinkFunction, SinkFunction.class, 0, -1, 0));
+ }
+
+ private DataStream<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction,
+ TypeSerializerWrapper<OUT, Tuple, OUT> typeWrapper) {
+ DataStream<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink");
+
+ try {
+ jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
+ typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+ degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize SinkFunction");
+ }
+
+ inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+
+ if (this.copy().userDefinedName != null) {
+ returnStream.name(getUserDefinedNames());
+ }
+
+ return returnStream;
+ }
+
+ /**
+ * Creates a copy of the {@link DataStream}
+ *
+ * @return The copy
+ */
+ protected abstract DataStream<OUT> copy();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
new file mode 100755
index 0000000..ee6502f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Represents the end of a DataStream.
+ *
+ * @param <IN>
+ * The type of the DataStream closed by the sink.
+ */
+public class DataStreamSink<IN> extends DataStream<IN> {
+
+ protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType) {
+ super(environment, operatorType);
+ }
+
+ protected DataStreamSink(DataStream<IN> dataStream) {
+ super(dataStream);
+ }
+
+ @Override
+ protected DataStream<IN> copy() {
+ throw new RuntimeException("Data stream sinks cannot be copied");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
new file mode 100755
index 0000000..f939851
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * The DataStreamSource represents the starting point of a DataStream.
+ *
+ * @param <OUT>
+ * Type of the DataStream created.
+ */
+public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
+
+ public DataStreamSource(StreamExecutionEnvironment environment, String operatorType) {
+ super(environment, operatorType);
+ }
+
+ public DataStreamSource(DataStream<OUT> dataStream) {
+ super(dataStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
new file mode 100644
index 0000000..b9aadcd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+
+/**
+ * The iterative data stream represents the start of an iteration in a
+ * {@link DataStream}.
+ *
+ * @param <T>
+ * Type of the DataStream
+ */
+public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>> {
+
+ static Integer iterationCount = 0;
+ protected Integer iterationID;
+
+ protected IterativeDataStream(DataStream<T> dataStream) {
+ super(dataStream);
+ iterationID = iterationCount;
+ iterationCount++;
+ }
+
+ protected IterativeDataStream(DataStream<T> dataStream, Integer iterationID) {
+ super(dataStream);
+ this.iterationID = iterationID;
+ }
+
+ /**
+ * Closes the iteration. This method defines the end of the iterative
+ * program part. By default the DataStream represented by the parameter will
+ * be fed back to the iteration head, however the user can explicitly select
+ * which tuples should be iterated by {@code directTo(OutputSelector)}.
+ * Tuples directed to 'iterate' will be fed back to the iteration head.
+ *
+ * @param iterationResult
+ * The data stream that can be fed back to the next iteration.
+ *
+ */
+ public DataStream<T> closeWith(DataStream<T> iterationResult) {
+ return closeWith(iterationResult, null);
+ }
+
+ /**
+ * Closes the iteration. This method defines the end of the iterative
+ * program part. By default the DataStream represented by the parameter will
+ * be fed back to the iteration head, however the user can explicitly select
+ * which tuples should be iterated by {@code directTo(OutputSelector)}.
+ * Tuples directed to 'iterate' will be fed back to the iteration head.
+ *
+ * @param iterationTail
+ * The data stream that can be fed back to the next iteration.
+ * @param iterationName
+ * Name of the iteration edge (backward edge to iteration head)
+ * when used with directed emits
+ *
+ */
+ public <R> DataStream<T> closeWith(DataStream<T> iterationTail, String iterationName) {
+ DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
+
+ jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
+ iterationID.toString(), iterationTail.getParallelism(), iterationName);
+
+ jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
+ iterationTail.getParallelism());
+
+ if (iterationTail instanceof ConnectedDataStream) {
+ for (DataStream<T> stream : ((ConnectedDataStream<T>) iterationTail).connectedStreams) {
+ String inputID = stream.getId();
+ jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(),
+ 0);
+ }
+ } else {
+ jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
+ new ForwardPartitioner<T>(), 0);
+ }
+
+ return iterationTail;
+ }
+
+ @Override
+ protected IterativeDataStream<T> copy() {
+ return new IterativeDataStream<T>(this, iterationID);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
new file mode 100755
index 0000000..9af4dc8
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -0,0 +1,148 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * The SingleOutputStreamOperator represents a user defined transformation
+ * applied on a {@link DataStream} with one predefined output type.
+ *
+ * @param <OUT>
+ * Output type of the operator.
+ * @param <O>
+ * Type of the operator.
+ */
+public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperator<OUT, O>> extends
+ DataStream<OUT> {
+
+ protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, String operatorType) {
+ super(environment, operatorType);
+ }
+
+ protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
+ super(dataStream);
+ }
+
+ /**
+ * Sets the degree of parallelism for this operator. The degree must be 1 or
+ * more.
+ *
+ * @param dop
+ * The degree of parallelism for this operator.
+ * @return The operator with set degree of parallelism.
+ */
+ public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
+ if (dop < 1) {
+ throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+ }
+ this.degreeOfParallelism = dop;
+
+ jobGraphBuilder.setParallelism(id, degreeOfParallelism);
+
+ return this;
+ }
+
+ /**
+ * Sets the mutability of the operator. If the operator is set to mutable,
+ * the tuples received in the user defined functions, will be reused after
+ * the function call. Setting an operator to mutable reduces garbage
+ * collection overhead and thus increases scalability. Please note that if a
+ * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
+ * as mutable, the user can only iterate through the iterator once in every
+ * invoke.
+ *
+ * @param isMutable
+ * The mutability of the operator.
+ * @return The operator with mutability set.
+ */
+ public DataStream<OUT> setMutability(boolean isMutable) {
+ jobGraphBuilder.setMutability(id, isMutable);
+ return this;
+ }
+
+ /**
+ * Sets the maximum time frequency (ms) for the flushing of the output
+ * buffer. By default the output buffers flush only when they are full.
+ *
+ * @param timeoutMillis
+ * The maximum time between two output flushes.
+ * @return The operator with buffer timeout set.
+ */
+ public DataStream<OUT> setBufferTimeout(long timeoutMillis) {
+ jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
+ return this;
+ }
+
+ /**
+ * Operator used for directing tuples to specific named outputs using an
+ * {@link OutputSelector}. Calling this method on an operator creates a new
+ * {@link SplitDataStream}.
+ *
+ * @param outputSelector
+ * The user defined {@link OutputSelector} for directing the
+ * tuples.
+ * @return The {@link SplitDataStream}
+ */
+ public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+ try {
+ jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize OutputSelector");
+ }
+
+ return new SplitDataStream<OUT>(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
+ return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> broadcast() {
+ return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> shuffle() {
+ return (SingleOutputStreamOperator<OUT, O>) super.shuffle();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> forward() {
+ return (SingleOutputStreamOperator<OUT, O>) super.forward();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<OUT, O> distribute() {
+ return (SingleOutputStreamOperator<OUT, O>) super.distribute();
+ }
+
+ @Override
+ protected SingleOutputStreamOperator<OUT, O> copy() {
+ return new SingleOutputStreamOperator<OUT, O>(this);
+ }
+
+}