You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:19 UTC
[03/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
new file mode 100644
index 0000000..176a07f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -0,0 +1,1077 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamFilter;
+import org.apache.flink.streaming.api.operators.StreamFlatMap;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A DataStream represents a stream of elements of the same type. A DataStream
+ * can be transformed into another DataStream by applying a transformation as
+ * for example:
+ * <ul>
+ * <li>{@link DataStream#map},
+ * <li>{@link DataStream#filter}, or
+ * </ul>
+ *
+ * @param <T> The type of the elements in this Stream
+ */
+public class DataStream<T> {
+
+ protected final StreamExecutionEnvironment environment;
+
+ protected final StreamTransformation<T> transformation;
+
+ /**
+ * Create a new {@link DataStream} in the given execution environment with
+ * partitioning set to forward by default.
+ *
+ * @param environment The StreamExecutionEnvironment
+ */
+ public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
+ this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
+ this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
+ }
+
+ /**
+ * Returns the ID of the {@link DataStream} in the current {@link StreamExecutionEnvironment}.
+ *
+ * @return ID of the DataStream
+ */
+ public Integer getId() {
+ return transformation.getId();
+ }
+
+ /**
+ * Gets the parallelism for this operator.
+ *
+ * @return The parallelism set for this operator.
+ */
+ public int getParallelism() {
+ return transformation.getParallelism();
+ }
+
+ /**
+ * Gets the type of the stream.
+ *
+ * @return The type of the datastream.
+ */
+ public TypeInformation<T> getType() {
+ return transformation.getOutputType();
+ }
+
+ /**
+ * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
+ * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
+ *
+ * @return The cleaned Function
+ */
+ protected <F> F clean(F f) {
+ return getExecutionEnvironment().clean(f);
+ }
+
+ /**
+ * Returns the {@link StreamExecutionEnvironment} that was used to create this
+ * {@link DataStream}
+ *
+ * @return The Execution Environment
+ */
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return environment;
+ }
+
+ public ExecutionConfig getExecutionConfig() {
+ return environment.getConfig();
+ }
+
+ /**
+ * Creates a new {@link DataStream} by merging {@link DataStream} outputs of
+ * the same type with each other. The DataStreams merged using this operator
+ * will be transformed simultaneously.
+ *
+ * @param streams
+ * The DataStreams to union output with.
+ * @return The {@link DataStream}.
+ */
+ @SafeVarargs
+ public final DataStream<T> union(DataStream<T>... streams) {
+ List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
+ unionedTransforms.add(this.transformation);
+
+ Collection<StreamTransformation<?>> thisPredecessors = this.getTransformation().getTransitivePredecessors();
+
+ for (DataStream<T> newStream : streams) {
+ if (!(newStream.getParallelism() == this.getParallelism())) {
+ throw new UnsupportedClassVersionError(
+ "DataStream can only be unioned with DataStreams of the same parallelism. " +
+ "This Stream: " + this.getTransformation() +
+ ", other stream: " + newStream.getTransformation());
+ }
+ if (!getType().equals(newStream.getType())) {
+ throw new IllegalArgumentException("Cannot union streams of different types: "
+ + getType() + " and " + newStream.getType());
+ }
+
+ Collection<StreamTransformation<?>> predecessors = newStream.getTransformation().getTransitivePredecessors();
+
+ if (predecessors.contains(this.transformation) || thisPredecessors.contains(newStream.getTransformation())) {
+ throw new UnsupportedOperationException("A DataStream cannot be unioned with itself");
+ }
+ unionedTransforms.add(newStream.getTransformation());
+ }
+ return new DataStream<T>(this.environment, new UnionTransformation<T>(unionedTransforms));
+ }
+
+ /**
+ * Operator used for directing tuples to specific named outputs using an
+ * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
+ * Calling this method on an operator creates a new {@link SplitStream}.
+ *
+ * @param outputSelector
+ * The user defined
+ * {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}
+ * for directing the tuples.
+ * @return The {@link SplitStream}
+ */
+ public SplitStream<T> split(OutputSelector<T> outputSelector) {
+ return new SplitStream<T>(this, clean(outputSelector));
+ }
+
+ /**
+ * Creates a new {@link ConnectedStreams} by connecting
+ * {@link DataStream} outputs of (possible) different types with each other.
+ * The DataStreams connected using this operator can be used with
+ * CoFunctions to apply joint transformations.
+ *
+ * @param dataStream
+ * The DataStream with which this stream will be connected.
+ * @return The {@link ConnectedStreams}.
+ */
+ public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
+ return new ConnectedStreams<T, R>(environment, this, dataStream);
+ }
+
+ /**
+ *
+ * It creates a new {@link KeyedStream} that uses the provided key for partitioning
+ * its operator states.
+ *
+ * @param key
+ * The KeySelector to be used for extracting the key for partitioning
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
+ */
+ public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
+ return new KeyedStream<T, K>(this, clean(key));
+ }
+
+ /**
+ * Partitions the operator state of a {@link DataStream} by the given key positions.
+ *
+ * @param fields
+ * The position of the fields on which the {@link DataStream}
+ * will be grouped.
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
+ */
+ public KeyedStream<T, Tuple> keyBy(int... fields) {
+ if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
+ return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
+ } else {
+ return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
+ }
+ }
+
+ /**
+ * Partitions the operator state of a {@link DataStream}using field expressions.
+ * A field expression is either the name of a public field or a getter method with parentheses
+ * of the {@link DataStream}S underlying type. A dot can be used to drill
+ * down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param fields
+ * One or more field expressions on which the state of the {@link DataStream} operators will be
+ * partitioned.
+ * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
+ **/
+ public KeyedStream<T, Tuple> keyBy(String... fields) {
+ return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
+ }
+
+ private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
+ return new KeyedStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
+ getType(), getExecutionConfig())));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned hashing on the given fields. This setting only
+ * effects the how the outputs will be distributed between the parallel
+ * instances of the next processing operator.
+ *
+ * @param fields The tuple fields that should be used for partitioning
+ * @return The partitioned DataStream
+ *
+ */
+ public DataStream<T> partitionByHash(int... fields) {
+ if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
+ return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType()));
+ } else {
+ return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
+ }
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned hashing on the given fields. This setting only
+ * effects the how the outputs will be distributed between the parallel
+ * instances of the next processing operator.
+ *
+ * @param fields The tuple fields that should be used for partitioning
+ * @return The partitioned DataStream
+ *
+ */
+ public DataStream<T> partitionByHash(String... fields) {
+ return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output is
+ * partitioned using the given {@link KeySelector}. This setting only
+ * effects the how the outputs will be distributed between the parallel
+ * instances of the next processing operator.
+ *
+ * @param keySelector The function that extracts the key from an element in the Stream
+ * @return The partitioned DataStream
+ */
+ public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector) {
+ return setConnectionType(new HashPartitioner<T>(clean(keySelector)));
+ }
+
+ //private helper method for partitioning
+ private DataStream<T> partitionByHash(Keys<T> keys) {
+ KeySelector<T, ?> keySelector = clean(KeySelectorUtil.getSelectorForKeys(
+ keys,
+ getType(),
+ getExecutionConfig()));
+
+ return setConnectionType(new HashPartitioner<T>(keySelector));
+ }
+
+ /**
+ * Partitions a tuple DataStream on the specified key fields using a custom partitioner.
+ * This method takes the key position to partition on, and a partitioner that accepts the key type.
+ * <p>
+ * Note: This method works only on single field keys.
+ *
+ * @param partitioner The partitioner to assign partitions to keys.
+ * @param field The field index on which the DataStream is to partitioned.
+ * @return The partitioned DataStream.
+ */
+ public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) {
+ Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<T>(new int[]{field}, getType());
+ return partitionCustom(partitioner, outExpressionKeys);
+ }
+
+ /**
+ * Partitions a POJO DataStream on the specified key fields using a custom partitioner.
+ * This method takes the key expression to partition on, and a partitioner that accepts the key type.
+ * <p>
+ * Note: This method works only on single field keys.
+ *
+ * @param partitioner The partitioner to assign partitions to keys.
+ * @param field The field index on which the DataStream is to partitioned.
+ * @return The partitioned DataStream.
+ */
+ public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {
+ Keys.ExpressionKeys<T> outExpressionKeys = new Keys.ExpressionKeys<T>(new String[]{field}, getType());
+ return partitionCustom(partitioner, outExpressionKeys);
+ }
+
+
+ /**
+ * Partitions a DataStream on the key returned by the selector, using a custom partitioner.
+ * This method takes the key selector to get the key to partition on, and a partitioner that
+ * accepts the key type.
+ * <p>
+ * Note: This method works only on single field keys, i.e. the selector cannot return tuples
+ * of fields.
+ *
+ * @param partitioner
+ * The partitioner to assign partitions to keys.
+ * @param keySelector
+ * The KeySelector with which the DataStream is partitioned.
+ * @return The partitioned DataStream.
+ * @see KeySelector
+ */
+ public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
+ return setConnectionType(new CustomPartitionerWrapper<K, T>(clean(partitioner),
+ clean(keySelector)));
+ }
+
+ // private helper method for custom partitioning
+ private <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, Keys<T> keys) {
+ KeySelector<T, K> keySelector = KeySelectorUtil.getSelectorForOneKey(keys, partitioner, getType(), getExecutionConfig());
+
+ return setConnectionType(
+ new CustomPartitionerWrapper<K, T>(
+ clean(partitioner),
+ clean(keySelector)));
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are broadcasted to every parallel instance of the next component.
+ *
+ * <p>
+ * This setting only effects the how the outputs will be distributed between
+ * the parallel instances of the next processing operator.
+ *
+ * @return The DataStream with broadcast partitioning set.
+ */
+ public DataStream<T> broadcast() {
+ return setConnectionType(new BroadcastPartitioner<T>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are shuffled uniformly randomly to the next component.
+ *
+ * <p>
+ * This setting only effects the how the outputs will be distributed between
+ * the parallel instances of the next processing operator.
+ *
+ * @return The DataStream with shuffle partitioning set.
+ */
+ public DataStream<T> shuffle() {
+ return setConnectionType(new ShufflePartitioner<T>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are forwarded to the local subtask of the next component (whenever
+ * possible).
+ *
+ * <p>
+ * This setting only effects the how the outputs will be distributed between
+ * the parallel instances of the next processing operator.
+ *
+ * @return The DataStream with forward partitioning set.
+ */
+ public DataStream<T> forward() {
+ return setConnectionType(new ForwardPartitioner<T>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output tuples
+ * are distributed evenly to instances of the next component in a Round-robin
+ * fashion.
+ *
+ * <p>
+ * This setting only effects the how the outputs will be distributed between
+ * the parallel instances of the next processing operator.
+ *
+ * @return The DataStream with rebalance partitioning set.
+ */
+ public DataStream<T> rebalance() {
+ return setConnectionType(new RebalancePartitioner<T>());
+ }
+
+ /**
+ * Sets the partitioning of the {@link DataStream} so that the output values
+ * all go to the first instance of the next processing operator. Use this
+ * setting with care since it might cause a serious performance bottleneck
+ * in the application.
+ *
+ * @return The DataStream with shuffle partitioning set.
+ */
+ public DataStream<T> global() {
+ return setConnectionType(new GlobalPartitioner<T>());
+ }
+
+ /**
+ * Initiates an iterative part of the program that feeds back data streams.
+ * The iterative part needs to be closed by calling
+ * {@link IterativeStream#closeWith(DataStream)}. The transformation of
+ * this IterativeStream will be the iteration head. The data stream
+ * given to the {@link IterativeStream#closeWith(DataStream)} method is
+ * the data stream that will be fed back and used as the input for the
+ * iteration head. The user can also use different feedback type than the
+ * input of the iteration and treat the input and feedback streams as a
+ * {@link ConnectedStreams} be calling
+ * {@link IterativeStream#withFeedbackType(TypeInformation)}
+ * <p>
+ * A common usage pattern for streaming iterations is to use output
+ * splitting to send a part of the closing data stream to the head. Refer to
+ * {@link #split(OutputSelector)} for more information.
+ * <p>
+ * The iteration edge will be partitioned the same way as the first input of
+ * the iteration head unless it is changed in the
+ * {@link IterativeStream#closeWith(DataStream)} call.
+ * <p>
+ * By default a DataStream with iteration will never terminate, but the user
+ * can use the maxWaitTime parameter to set a max waiting time for the
+ * iteration head. If no data received in the set time, the stream
+ * terminates.
+ *
+ * @return The iterative data stream created.
+ */
+ public IterativeStream<T> iterate() {
+ return new IterativeStream<T>(this, 0);
+ }
+
+ /**
+ * Initiates an iterative part of the program that feeds back data streams.
+ * The iterative part needs to be closed by calling
+ * {@link IterativeStream#closeWith(DataStream)}. The transformation of
+ * this IterativeStream will be the iteration head. The data stream
+ * given to the {@link IterativeStream#closeWith(DataStream)} method is
+ * the data stream that will be fed back and used as the input for the
+ * iteration head. The user can also use different feedback type than the
+ * input of the iteration and treat the input and feedback streams as a
+ * {@link ConnectedStreams} be calling
+ * {@link IterativeStream#withFeedbackType(TypeInformation)}
+ * <p>
+ * A common usage pattern for streaming iterations is to use output
+ * splitting to send a part of the closing data stream to the head. Refer to
+ * {@link #split(OutputSelector)} for more information.
+ * <p>
+ * The iteration edge will be partitioned the same way as the first input of
+ * the iteration head unless it is changed in the
+ * {@link IterativeStream#closeWith(DataStream)} call.
+ * <p>
+ * By default a DataStream with iteration will never terminate, but the user
+ * can use the maxWaitTime parameter to set a max waiting time for the
+ * iteration head. If no data received in the set time, the stream
+ * terminates.
+ *
+ * @param maxWaitTimeMillis
+ * Number of milliseconds to wait between inputs before shutting
+ * down
+ *
+ * @return The iterative data stream created.
+ */
+ public IterativeStream<T> iterate(long maxWaitTimeMillis) {
+ return new IterativeStream<T>(this, maxWaitTimeMillis);
+ }
+
+ /**
+ * Applies a Map transformation on a {@link DataStream}. The transformation
+ * calls a {@link MapFunction} for each element of the DataStream. Each
+ * MapFunction call returns exactly one element. The user can also extend
+ * {@link RichMapFunction} to gain access to other features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param mapper
+ * The MapFunction that is called for each element of the
+ * DataStream.
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<T, R> mapper) {
+
+ TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
+ Utils.getCallLocationName(), true);
+
+ return transform("Map", outType, new StreamMap<T, R>(clean(mapper)));
+ }
+
+ /**
+ * Applies a FlatMap transformation on a {@link DataStream}. The
+ * transformation calls a {@link FlatMapFunction} for each element of the
+ * DataStream. Each FlatMapFunction call can return any number of elements
+ * including none. The user can also extend {@link RichFlatMapFunction} to
+ * gain access to other features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param flatMapper
+ * The FlatMapFunction that is called for each element of the
+ * DataStream
+ *
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<T, R> flatMapper) {
+
+ TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
+ getType(), Utils.getCallLocationName(), true);
+
+ return transform("Flat Map", outType, new StreamFlatMap<T, R>(clean(flatMapper)));
+
+ }
+
+ /**
+ * Applies a Filter transformation on a {@link DataStream}. The
+ * transformation calls a {@link FilterFunction} for each element of the
+ * DataStream and retains only those element for which the function returns
+ * true. Elements for which the function returns false are filtered. The
+ * user can also extend {@link RichFilterFunction} to gain access to other
+ * features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param filter
+ * The FilterFunction that is called for each element of the
+ * DataStream.
+ * @return The filtered DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter) {
+ return transform("Filter", getType(), new StreamFilter<T>(clean(filter)));
+
+ }
+
+ /**
+ * Initiates a Project transformation on a {@link Tuple} {@link DataStream}.<br/>
+ * <b>Note: Only Tuple DataStreams can be projected.</b>
+ *
+ * <p>
+ * The transformation projects each Tuple of the DataSet onto a (sub)set of
+ * fields.
+ *
+ * @param fieldIndexes
+ * The field indexes of the input tuples that are retained. The
+ * order of fields in the output tuple corresponds to the order
+ * of field indexes.
+ * @return The projected DataStream
+ *
+ * @see Tuple
+ * @see DataStream
+ */
+ public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes) {
+ return new StreamProjection<T>(this, fieldIndexes).projectTupleX();
+ }
+
+ /**
+ * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys
+ * and window can be specified.
+ */
+ public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
+ return new CoGroupedStreams<>(this, otherStream);
+ }
+
+ /**
+ * Creates a join operation. See {@link JoinedStreams} for an example of how the keys
+ * and window can be specified.
+ */
+ public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
+ return new JoinedStreams<>(this, otherStream);
+ }
+
+ /**
+ * Windows this {@code DataStream} into tumbling time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+ * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+ * set using
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
+ */
+ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
+ return windowAll(TumblingTimeWindows.of(size));
+ }
+
+ /**
+ * Windows this {@code DataStream} into sliding time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+ * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
+ * @param size The size of the window.
+ */
+ public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
+ return windowAll(SlidingTimeWindows.of(size, slide));
+ }
+
+ /**
+ * Windows this {@code DataStream} into tumbling count windows.
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
+ * @param size The size of the windows in number of elements.
+ */
+ public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
+ return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+ }
+
+ /**
+ * Windows this {@code DataStream} into sliding count windows.
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
+ * @param size The size of the windows in number of elements.
+ * @param slide The slide interval in number of elements.
+ */
+ public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
+ return windowAll(GlobalWindows.create())
+ .evictor(CountEvictor.of(size))
+ .trigger(CountTrigger.of(slide));
+ }
+
+ /**
+ * Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows
+ * over a key grouped stream. Elements are put into windows by a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. The grouping of
+ * elements is done both by key and by window.
+ *
+ * <p>
+ * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+ * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+ * that is used if a {@code Trigger} is not specified.
+ *
+ * <p>
+ * Note: This operation can be inherently non-parallel since all elements have to pass through
+ * the same operator instance. (Only for special cases, such as aligned time windows is
+ * it possible to perform this operation in parallel).
+ *
+ * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+ * @return The trigger windows data stream.
+ */
+ public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
+ return new AllWindowedStream<>(this, assigner);
+ }
+
+ /**
+ * Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
+ * The internal timestamps are, for example, used to to event-time window operations.
+ *
+ * <p>
+ * If you know that the timestamps are strictly increasing you can use an
+ * {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. Otherwise,
+ * you should provide a {@link TimestampExtractor} that also implements
+ * {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param extractor The TimestampExtractor that is called for each element of the DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> assignTimestamps(TimestampExtractor<T> extractor) {
+ // match parallelism to input, otherwise dop=1 sources could lead to some strange
+ // behaviour: the watermark will creep along very slowly because the elements
+ // from the source go to each extraction operator round robin.
+ int inputParallelism = getTransformation().getParallelism();
+ ExtractTimestampsOperator<T> operator = new ExtractTimestampsOperator<>(clean(extractor));
+ return transform("ExtractTimestamps", getTransformation().getOutputType(), operator)
+ .setParallelism(inputParallelism);
+ }
+
+ /**
+ * Writes a DataStream to the standard output stream (stdout).
+ *
+ * <p>
+ * For each element of the DataStream the result of
+ * {@link Object#toString()} is written.
+ *
+ * @return The closed DataStream.
+ */
+ public DataStreamSink<T> print() {
+ PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
+ return addSink(printFunction);
+ }
+
+ /**
+ * Writes a DataStream to the standard output stream (stderr).
+ *
+ * <p>
+ * For each element of the DataStream the result of
+ * {@link Object#toString()} is written.
+ *
+ * @return The closed DataStream.
+ */
+ public DataStreamSink<T> printToErr() {
+ PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>(true);
+ return addSink(printFunction);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format.
+ *
+ * <p>
+ * For every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @return the closed DataStream.
+ */
+ public DataStreamSink<T> writeAsText(String path) {
+ return write(new TextOutputFormat<T>(new Path(path)), 0L);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds.
+ *
+ * <p>
+ * For every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream
+ */
+ public DataStreamSink<T> writeAsText(String path, long millis) {
+ TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
+ return write(tof, millis);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format.
+ *
+ * <p>
+ * For every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ * @param writeMode
+ * Control the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ *
+ * @return the closed DataStream.
+ */
+ public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
+ TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
+ tof.setWriteMode(writeMode);
+ return write(tof, 0L);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format.
+ *
+ * <p>
+ * For every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ * @param writeMode
+ * Controls the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream.
+ */
+ public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis) {
+ TextOutputFormat<T> tof = new TextOutputFormat<T>(new Path(path));
+ tof.setWriteMode(writeMode);
+ return write(tof, millis);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format.
+ *
+ * <p>
+ * For every field of an element of the DataStream the result of {@link Object#toString()}
+ * is written. This method can only be used on data streams of tuples.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @return the closed DataStream
+ */
+ @SuppressWarnings("unchecked")
+ public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path) {
+ Preconditions.checkArgument(getType().isTupleType(),
+ "The writeAsCsv() method can only be used on data sets of tuples.");
+ CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+ CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+ return write((OutputFormat<T>) of, 0L);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically, in every millis milliseconds.
+ *
+ * <p>
+ * For every field of an element of the DataStream the result of {@link Object#toString()}
+ * is written. This method can only be used on data streams of tuples.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream
+ */
+ @SuppressWarnings("unchecked")
+ public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, long millis) {
+ Preconditions.checkArgument(getType().isTupleType(),
+ "The writeAsCsv() method can only be used on data sets of tuples.");
+ CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+ CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+ return write((OutputFormat<T>) of, millis);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format.
+ *
+ * <p>
+ * For every field of an element of the DataStream the result of {@link Object#toString()}
+ * is written. This method can only be used on data streams of tuples.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ * @param writeMode
+ * Controls the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ *
+ * @return the closed DataStream
+ */
+ @SuppressWarnings("unchecked")
+ public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode) {
+ Preconditions.checkArgument(getType().isTupleType(),
+ "The writeAsCsv() method can only be used on data sets of tuples.");
+ CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+ CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+ if (writeMode != null) {
+ of.setWriteMode(writeMode);
+ }
+ return write((OutputFormat<T>) of, 0L);
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically, in every millis milliseconds.
+ *
+ * <p>
+ * For every field of an element of the DataStream the result of {@link Object#toString()}
+ * is written. This method can only be used on data streams of tuples.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ * @param writeMode
+ * Controls the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream
+ */
+ @SuppressWarnings("unchecked")
+ public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode,
+ long millis) {
+ Preconditions.checkArgument(getType().isTupleType(),
+ "The writeAsCsv() method can only be used on data sets of tuples.");
+ CsvOutputFormat<X> of = new CsvOutputFormat<X>(new Path(path),
+ CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+ if (writeMode != null) {
+ of.setWriteMode(writeMode);
+ }
+ return write((OutputFormat<T>) of, millis);
+ }
+
+ /**
+ * Writes the DataStream to a socket as a byte array. The format of the
+ * output is specified by a {@link SerializationSchema}.
+ *
+ * @param hostName
+ * host of the socket
+ * @param port
+ * port of the socket
+ * @param schema
+ * schema for serialization
+ * @return the closed DataStream
+ */
+ public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
+ DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
+ returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
+ return returnStream;
+ }
+
+ /**
+ * Writes the dataStream into an output, described by an OutputFormat.
+ *
+ * @param format The output format
+ * @param millis the write frequency
+ * @return The closed DataStream
+ */
+ public DataStreamSink<T> write(OutputFormat<T> format, long millis) {
+ return addSink(new FileSinkFunctionByMillis<T>(format, millis));
+ }
+
+ /**
+ * Method for passing user defined operators along with the type
+ * information that will transform the DataStream.
+ *
+ * @param operatorName
+ * name of the operator, for logging purposes
+ * @param outTypeInfo
+ * the output type of the operator
+ * @param operator
+ * the object containing the transformation logic
+ * @param <R>
+ * type of the return stream
+ * @return the data stream constructed
+ */
+ public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
+ // read the output type of the input Transform to coax out errors about MissingTypeInfo
+ transformation.getOutputType();
+
+ OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>(
+ this.transformation,
+ operatorName,
+ operator,
+ outTypeInfo,
+ environment.getParallelism());
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
+
+ getExecutionEnvironment().addOperator(resultTransform);
+
+ return returnStream;
+ }
+
+ /**
+ * Internal function for setting the partitioner for the DataStream
+ *
+ * @param partitioner
+ * Partitioner to set.
+ * @return The modified DataStream.
+ */
+ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+ return new DataStream<T>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
+ }
+
+ /**
+ * Adds the given sink to this DataStream. Only streams with sinks added
+ * will be executed once the {@link StreamExecutionEnvironment#execute()}
+ * method is called.
+ *
+ * @param sinkFunction
+ * The object containing the sink's invoke function.
+ * @return The closed DataStream.
+ */
+ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+
+ // read the output type of the input Transform to coax out errors about MissingTypeInfo
+ transformation.getOutputType();
+
+ // configure the type if needed
+ if (sinkFunction instanceof InputTypeConfigurable) {
+ ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() );
+ }
+
+ StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));
+
+ DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);
+
+ getExecutionEnvironment().addOperator(sink.getTransformation());
+ return sink;
+ }
+
+ /**
+ * Returns the {@link StreamTransformation} that represents the operation that logically creates
+ * this {@link DataStream}.
+ *
+ * @return The Transformation
+ */
+ public StreamTransformation<T> getTransformation() {
+ return transformation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
new file mode 100644
index 0000000..24104ad
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+
+/**
+ * A Stream Sink. This is used for emitting elements from a streaming topology.
+ *
+ * @param <T> The type of the elements in the Stream
+ */
+public class DataStreamSink<T> {
+
+ SinkTransformation<T> transformation;
+
+ @SuppressWarnings("unchecked")
+ protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
+ this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
+ }
+
+ /**
+ * Returns the transformation that contains the actual sink operator of this sink.
+ */
+ public SinkTransformation<T> getTransformation() {
+ return transformation;
+ }
+
+ /**
+ * Sets the name of this sink. This name is
+ * used by the visualization and logging during runtime.
+ *
+ * @return The named sink.
+ */
+ public DataStreamSink<T> name(String name) {
+ transformation.setName(name);
+ return this;
+ }
+
+ /**
+ * Sets the parallelism for this sink. The degree must be higher than zero.
+ *
+ * @param parallelism The parallelism for this sink.
+ * @return The sink with set parallelism.
+ */
+ public DataStreamSink<T> setParallelism(int parallelism) {
+ transformation.setParallelism(parallelism);
+ return this;
+ }
+
+ /**
+ * Turns off chaining for this operator so thread co-location will not be
+ * used as an optimization.
+ *
+ * <p>
+ * Chaining can be turned off for the whole
+ * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
+ * however it is not advised for performance considerations.
+ *
+ * @return The sink with chaining disabled
+ */
+ public DataStreamSink<T> disableChaining() {
+ this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
new file mode 100644
index 0000000..d2e04a7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+
+/**
+ * The DataStreamSource represents the starting point of a DataStream.
+ *
+ * @param <T> Type of the elements in the DataStream created from the this source.
+ */
+public class DataStreamSource<T> extends SingleOutputStreamOperator<T, DataStreamSource<T>> {
+
+ boolean isParallel;
+
+ public DataStreamSource(StreamExecutionEnvironment environment,
+ TypeInformation<T> outTypeInfo, StreamSource<T> operator,
+ boolean isParallel, String sourceName) {
+ super(environment, new SourceTransformation<T>(sourceName, operator, outTypeInfo, environment.getParallelism()));
+
+ this.isParallel = isParallel;
+ if (!isParallel) {
+ setParallelism(1);
+ }
+ }
+
+ @Override
+ public DataStreamSource<T> setParallelism(int parallelism) {
+ if (parallelism > 1 && !isParallel) {
+ throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
+ } else {
+ return (DataStreamSource<T>) super.setParallelism(parallelism);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
new file mode 100644
index 0000000..346bef9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+
+import java.util.Collection;
+
+/**
+ * The iterative data stream represents the start of an iteration in a {@link DataStream}.
+ *
+ * @param <T> Type of the elements in this Stream
+ */
+public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeStream<T>> {
+
+ // We store these so that we can create a co-iteration if we need to
+ private DataStream<T> originalInput;
+ private long maxWaitTime;
+
+ protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
+ super(dataStream.getExecutionEnvironment(),
+ new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
+ this.originalInput = dataStream;
+ this.maxWaitTime = maxWaitTime;
+ setBufferTimeout(dataStream.environment.getBufferTimeout());
+ }
+
+ /**
+ * Closes the iteration. This method defines the end of the iterative
+ * program part that will be fed back to the start of the iteration.
+ *
+ * <p>
+ * A common usage pattern for streaming iterations is to use output
+ * splitting to send a part of the closing data stream to the head. Refer to
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
+ * for more information.
+ *
+ * @param feedbackStream
+ * {@link DataStream} that will be used as input to the iteration
+ * head.
+ *
+ * @return The feedback stream.
+ *
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public DataStream<T> closeWith(DataStream<T> feedbackStream) {
+
+ Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+ if (!predecessors.contains(this.transformation)) {
+ throw new UnsupportedOperationException(
+ "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
+ }
+
+ ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
+
+ return feedbackStream;
+ }
+
+ /**
+ * Changes the feedback type of the iteration and allows the user to apply
+ * co-transformations on the input and feedback stream, as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * For type safety the user needs to define the feedback type
+ *
+ * @param feedbackTypeString
+ * String describing the type information of the feedback stream.
+ * @return A {@link ConnectedIterativeStreams}.
+ */
+ public <F> ConnectedIterativeStreams<T, F> withFeedbackType(String feedbackTypeString) {
+ return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
+ }
+
+ /**
+ * Changes the feedback type of the iteration and allows the user to apply
+ * co-transformations on the input and feedback stream, as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * For type safety the user needs to define the feedback type
+ *
+ * @param feedbackTypeClass
+ * Class of the elements in the feedback stream.
+ * @return A {@link ConnectedIterativeStreams}.
+ */
+ public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
+ return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
+ }
+
+ /**
+ * Changes the feedback type of the iteration and allows the user to apply
+ * co-transformations on the input and feedback stream, as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * For type safety the user needs to define the feedback type
+ *
+ * @param feedbackType
+ * The type information of the feedback stream.
+ * @return A {@link ConnectedIterativeStreams}.
+ */
+ public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
+ return new ConnectedIterativeStreams<T, F>(originalInput, feedbackType, maxWaitTime);
+ }
+
+ /**
+ * The {@link ConnectedIterativeStreams} represent a start of an
+ * iterative part of a streaming program, where the original input of the
+ * iteration and the feedback of the iteration are connected as in a
+ * {@link ConnectedStreams}.
+ *
+ * <p>
+ * The user can distinguish between the two inputs using co-transformation,
+ * thus eliminating the need for mapping the inputs and outputs to a common
+ * type.
+ *
+ * @param <I>
+ * Type of the input of the iteration
+ * @param <F>
+ * Type of the feedback of the iteration
+ */
+ public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
+
+ private CoFeedbackTransformation<F> coFeedbackTransformation;
+
+ public ConnectedIterativeStreams(DataStream<I> input,
+ TypeInformation<F> feedbackType,
+ long waitTime) {
+ super(input.getExecutionEnvironment(),
+ input,
+ new DataStream<F>(input.getExecutionEnvironment(),
+ new CoFeedbackTransformation<F>(input.getParallelism(),
+ feedbackType,
+ waitTime)));
+ this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
+ }
+
+ /**
+ * Closes the iteration. This method defines the end of the iterative
+ * program part that will be fed back to the start of the iteration as
+ * the second input in the {@link ConnectedStreams}.
+ *
+ * @param feedbackStream
+ * {@link DataStream} that will be used as second input to
+ * the iteration head.
+ * @return The feedback stream.
+ *
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public DataStream<F> closeWith(DataStream<F> feedbackStream) {
+
+ Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();
+
+ if (!predecessors.contains(this.coFeedbackTransformation)) {
+ throw new UnsupportedOperationException(
+ "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
+ }
+
+ coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
+
+ return feedbackStream;
+ }
+
+ private UnsupportedOperationException groupingException = new UnsupportedOperationException(
+ "Cannot change the input partitioning of an iteration head directly. Apply the partitioning on the input and feedback streams instead.");
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(String field1, String field2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
+
+ @Override
+ public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
new file mode 100644
index 0000000..cff9355
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined.
+ * A streaming join operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize the join operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the the join is being evaluated in memory so you need to ensure that the number
+ * of elements per key does not get too high. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> twp = ...;
+ *
+ * DataStream<T> result = one.join(two)
+ * .where(new MyFirstKeySelector())
+ * .equalTo(new MyFirstKeySelector())
+ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ * .apply(new MyJoinFunction());
+ * } </pre>
+ */
+public class JoinedStreams<T1, T2> {
+
+ /** The first input stream */
+ private final DataStream<T1> input1;
+
+ /** The second input stream */
+ private final DataStream<T2> input2;
+
+ /**
+ * Creates new JoinedStreams data streams, which are the first step towards building a streaming co-group.
+ *
+ * @param input1 The first data stream.
+ * @param input2 The second data stream.
+ */
+ public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+ this.input1 = requireNonNull(input1);
+ this.input2 = requireNonNull(input2);
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
+ TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+ return new Where<>(input1.clean(keySelector), keyType);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * CoGrouped streams that have the key for one side defined.
+ *
+ * @param <KEY> The type of the key.
+ */
+ public class Where<KEY> {
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final TypeInformation<KEY> keyType;
+
+ Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
+ this.keySelector1 = keySelector1;
+ this.keyType = keyType;
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the second input.
+ */
+ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
+ TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+ if (!otherKey.equals(this.keyType)) {
+ throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
+ "first key = " + this.keyType + " , second key = " + otherKey);
+ }
+
+ return new EqualTo(input2.clean(keySelector));
+ }
+
+ // --------------------------------------------------------------------
+
+ /**
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
+ */
+ public class EqualTo {
+
+ private final KeySelector<T2, KEY> keySelector2;
+
+ EqualTo(KeySelector<T2, KEY> keySelector2) {
+ this.keySelector2 = requireNonNull(keySelector2);
+ }
+
+ /**
+ * Specifies the window on which the co-group operation works.
+ */
+ public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as
+ * well as a {@link WindowAssigner}.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ * @param <KEY> Type of the key. This must be the same for both inputs
+ * @param <W> Type of {@link Window} on which the join operation works.
+ */
+ public static class WithWindow<T1, T2, KEY, W extends Window> {
+
+ private final DataStream<T1> input1;
+ private final DataStream<T2> input2;
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final KeySelector<T2, KEY> keySelector2;
+ private final TypeInformation<KEY> keyType;
+
+ private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+ private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+ private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+ protected WithWindow(DataStream<T1> input1,
+ DataStream<T2> input2,
+ KeySelector<T1, KEY> keySelector1,
+ KeySelector<T2, KEY> keySelector2,
+ TypeInformation<KEY> keyType,
+ WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+ Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+ Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+
+ this.input1 = requireNonNull(input1);
+ this.input2 = requireNonNull(input2);
+
+ this.keySelector1 = requireNonNull(keySelector1);
+ this.keySelector2 = requireNonNull(keySelector2);
+ this.keyType = requireNonNull(keyType);
+
+ this.windowAssigner = requireNonNull(windowAssigner);
+
+ this.trigger = trigger;
+ this.evictor = evictor;
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, newTrigger, evictor);
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, trigger, newEvictor);
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
+ TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ function,
+ JoinFunction.class,
+ true,
+ true,
+ input1.getType(),
+ input2.getType(),
+ "Join",
+ false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+ //clean the closure
+ function = input1.getExecutionEnvironment().clean(function);
+
+ return input1.coGroup(input2)
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(new FlatJoinCoGroupFunction<>(function), resultType);
+
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
+ TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ function,
+ JoinFunction.class,
+ true,
+ true,
+ input1.getType(),
+ input2.getType(),
+ "Join",
+ false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Completes the join operation with the user function that is executed
+ * for each combination of elements with the same key in a window.
+ */
+ public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+ //clean the closure
+ function = input1.getExecutionEnvironment().clean(function);
+
+ return input1.coGroup(input2)
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .apply(new JoinCoGroupFunction<>(function), resultType);
+
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Implementation of the functions
+ // ------------------------------------------------------------------------
+
+ /**
+ * CoGroup function that does a nested-loop join to get the join result.
+ */
+ private static class JoinCoGroupFunction<T1, T2, T>
+ extends WrappingFunction<JoinFunction<T1, T2, T>>
+ implements CoGroupFunction<T1, T2, T> {
+ private static final long serialVersionUID = 1L;
+
+ public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
+ for (T1 val1: first) {
+ for (T2 val2: second) {
+ out.collect(wrappedFunction.join(val1, val2));
+ }
+ }
+ }
+ }
+
+ /**
+ * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version)
+ */
+ private static class FlatJoinCoGroupFunction<T1, T2, T>
+ extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
+ implements CoGroupFunction<T1, T2, T> {
+ private static final long serialVersionUID = 1L;
+
+ public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
+ for (T1 val1: first) {
+ for (T2 val2: second) {
+ wrappedFunction.join(val1, val2, out);
+ }
+ }
+ }
+ }
+
+}