You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:18 UTC

[02/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
new file mode 100644
index 0000000..d4a3a77
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
+ * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
+ * partitioning methods such as shuffle, forward and keyBy.
+ *
+ * <p>
+ * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
+ * that have the same key.
+ *
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
+ */
+public class KeyedStream<T, KEY> extends DataStream<T> {
+
+	/** The key selector that can get the key by which the stream if partitioned from the elements */
+	private final KeySelector<T, KEY> keySelector;
+
+	/** The type of the key by which the stream is partitioned */
+	private final TypeInformation<KEY> keyType;
+	
+	/**
+	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+	 * to partition operator state by key.
+	 * 
+	 * @param dataStream
+	 *            Base stream of data
+	 * @param keySelector
+	 *            Function for determining state partitions
+	 */
+	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
+		this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
+	}
+
+	/**
+	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+	 * to partition operator state by key.
+	 *
+	 * @param dataStream
+	 *            Base stream of data
+	 * @param keySelector
+	 *            Function for determining state partitions
+	 */
+	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
+		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
+				dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+		this.keySelector = keySelector;
+		this.keyType = keyType;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the key selector that can get the key by which the stream if partitioned from the elements.
+	 * @return The key selector for the key.
+	 */
+	public KeySelector<T, KEY> getKeySelector() {
+		return this.keySelector;
+	}
+
+	/**
+	 * Gets the type of the key by which the stream is partitioned. 
+	 * @return The type of the key by which the stream is partitioned.
+	 */
+	public TypeInformation<KEY> getKeyType() {
+		return keyType;
+	}
+
+	@Override
+	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+		throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
+	}
+
+	// ------------------------------------------------------------------------
+	//  basic transformations
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
+			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
+		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
+
+		// inject the key selector and key type
+		OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
+		transform.setStateKeySelector(keySelector);
+		transform.setStateKeyType(keyType);
+		
+		return returnStream;
+	}
+	
+	@Override
+	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+		DataStreamSink<T> result = super.addSink(sinkFunction);
+		result.getTransformation().setStateKeySelector(keySelector);
+		result.getTransformation().setStateKeyType(keyType);
+		return result;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Windowing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Windows this {@code KeyedStream} into tumbling time windows.
+	 *
+	 * <p>
+	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
+	 */
+	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+		return window(TumblingTimeWindows.of(size));
+	}
+
+	/**
+	 * Windows this {@code KeyedStream} into sliding time windows.
+	 *
+	 * <p>
+	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+	 * set using
+	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+	 *
+	 * @param size The size of the window.
+	 */
+	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+		return window(SlidingTimeWindows.of(size, slide));
+	}
+
+	/**
+	 * Windows this {@code KeyedStream} into tumbling count windows.
+	 *
+	 * @param size The size of the windows in number of elements.
+	 */
+	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
+		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+	}
+
+	/**
+	 * Windows this {@code KeyedStream} into sliding count windows.
+	 *
+	 * @param size The size of the windows in number of elements.
+	 * @param slide The slide interval in number of elements.
+	 */
+	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
+		return window(GlobalWindows.create())
+				.evictor(CountEvictor.of(size))
+				.trigger(CountTrigger.of(slide));
+	}
+
+	/**
+	 * Windows this data stream to a {@code WindowedStream}, which evaluates windows
+	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+	 * grouping of elements is done both by key and by window.
+	 *
+	 * <p>
+	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+	 * that is used if a {@code Trigger} is not specified.
+	 *
+	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+	 * @return The trigger windows data stream.
+	 */
+	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+		return new WindowedStream<>(this, assigner);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Non-Windowed aggregation operations
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies a reduce transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link ReduceFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same reducer.
+	 *
+	 * @param reducer
+	 *            The {@link ReduceFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
+		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
+				clean(reducer), getType().createSerializer(getExecutionConfig())));
+	}
+
+	/**
+	 * Applies a fold transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link FoldFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same folder.
+	 *
+	 * @param folder
+	 *            The {@link FoldFunction} that will be called for every element
+	 *            of the input values with the same key.
+	 * @param initialValue
+	 *            The initialValue passed to the folders for each key.
+	 * @return The transformed DataStream.
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
+
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
+				clean(folder), getType(), Utils.getCallLocationName(), true);
+
+		return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
+	}
+
+	/**
+	 * Applies an aggregation that gives a rolling sum of the data stream at the
+	 * given position grouped by the given key. An independent aggregate is kept
+	 * per key.
+	 *
+	 * @param positionToSum
+	 *            The position in the data point to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+		return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current sum of the pojo data
+	 * stream at the given field expressionby the given key. An independent
+	 * aggregate is kept per key. A field expression is either the name of a
+	 * public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> sum(String field) {
+		return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current minimum of the data
+	 * stream at the given position by the given key. An independent aggregate
+	 * is kept per key.
+	 *
+	 * @param positionToMin
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+		return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
+				getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current minimum of the pojo
+	 * data stream at the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> min(String field) {
+		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
+				false, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that gives the current maximum of the data stream
+	 * at the given position by the given key. An independent aggregate is kept
+	 * per key.
+	 *
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+		return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
+				getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current maximum of the pojo
+	 * data stream at the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> max(String field) {
+		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
+				false, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current minimum element of the
+	 * pojo data stream by the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
+				first, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current maximum element of the
+	 * pojo data stream by the given field expression by the given key. An
+	 * independent aggregate is kept per key. A field expression is either the
+	 * name of a public field or a getter method with parentheses of the
+	 * {@link DataStream}S underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.getInnerField2()" }.
+	 *
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
+				first, getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the minimum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the minimum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * minimum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the minimum value at the
+	 * given position, the operator returns either the first or last one,
+	 * depending on the parameter set.
+	 *
+	 * @param positionToMinBy
+	 *            The position in the data point to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            minimal value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
+				getExecutionConfig()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the maximum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the maximum value at the
+	 * given position, the operator returns the first one by default.
+	 *
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that that gives the current element with the
+	 * maximum value at the given position by the given key. An independent
+	 * aggregate is kept per key. If more elements have the maximum value at the
+	 * given position, the operator returns either the first or last one,
+	 * depending on the parameter set.
+	 *
+	 * @param positionToMaxBy
+	 *            The position in the data point to maximize.
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+		return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
+				getExecutionConfig()));
+	}
+
+	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
+		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
+				clean(aggregate), getType().createSerializer(getExecutionConfig()));
+		return transform("Keyed Aggregation", getType(), operator);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
new file mode 100644
index 0000000..33d5a3c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * The SingleOutputStreamOperator represents a user defined transformation
+ * applied on a {@link DataStream} with one predefined output type.
+ *
+ * @param <T> The type of the elements in this Stream
+ * @param <O> Type of the operator.
+ */
+public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
+
+	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
+		super(environment, transformation);
+	}
+
+	/**
+	 * Gets the name of the current data stream. This name is
+	 * used by the visualization and logging during runtime.
+	 *
+	 * @return Name of the stream.
+	 */
+	public String getName() {
+		return transformation.getName();
+	}
+
+	/**
+	 * Sets the name of the current data stream. This name is
+	 * used by the visualization and logging during runtime.
+	 *
+	 * @return The named operator.
+	 */
+	public SingleOutputStreamOperator<T, O> name(String name){
+		transformation.setName(name);
+		return this;
+	}
+
+	/**
+	 * Sets the parallelism for this operator. The degree must be 1 or more.
+	 * 
+	 * @param parallelism
+	 *            The parallelism for this operator.
+	 * @return The operator with set parallelism.
+	 */
+	public SingleOutputStreamOperator<T, O> setParallelism(int parallelism) {
+		if (parallelism < 1) {
+			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+		}
+
+		transformation.setParallelism(parallelism);
+
+		return this;
+	}
+
+	/**
+	 * Sets the maximum time frequency (ms) for the flushing of the output
+	 * buffer. By default the output buffers flush only when they are full.
+	 * 
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 * @return The operator with buffer timeout set.
+	 */
+	public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis) {
+		transformation.setBufferTimeout(timeoutMillis);
+		return this;
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<T, O> broadcast() {
+		return (SingleOutputStreamOperator<T, O>) super.broadcast();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<T, O> shuffle() {
+		return (SingleOutputStreamOperator<T, O>) super.shuffle();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<T, O> forward() {
+		return (SingleOutputStreamOperator<T, O>) super.forward();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<T, O> rebalance() {
+		return (SingleOutputStreamOperator<T, O>) super.rebalance();
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<T, O> global() {
+		return (SingleOutputStreamOperator<T, O>) super.global();
+	}
+
+	/**
+	 * Sets the {@link ChainingStrategy} for the given operator affecting the
+	 * way operators will possibly be co-located on the same thread for
+	 * increased performance.
+	 * 
+	 * @param strategy
+	 *            The selected {@link ChainingStrategy}
+	 * @return The operator with the modified chaining strategy
+	 */
+	private SingleOutputStreamOperator<T, O> setChainingStrategy(ChainingStrategy strategy) {
+		this.transformation.setChainingStrategy(strategy);
+		return this;
+	}
+
+	/**
+	 * Turns off chaining for this operator so thread co-location will not be
+	 * used as an optimization. </p> Chaining can be turned off for the whole
+	 * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
+	 * however it is not advised for performance considerations.
+	 * 
+	 * @return The operator with chaining disabled
+	 */
+	public SingleOutputStreamOperator<T, O> disableChaining() {
+		return setChainingStrategy(ChainingStrategy.NEVER);
+	}
+
+	/**
+	 * Starts a new task chain beginning at this operator. This operator will
+	 * not be chained (thread co-located for increased performance) to any
+	 * previous tasks even if possible.
+	 * 
+	 * @return The operator with chaining set.
+	 */
+	public SingleOutputStreamOperator<T, O> startNewChain() {
+		return setChainingStrategy(ChainingStrategy.HEAD);
+	}
+
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a type information string that will be parsed. A type information string can contain the following
+	 * types:
+	 *
+	 * <ul>
+	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
+	 * <li>Basic type arrays such as <code>Integer[]</code>,
+	 * <code>String[]</code>, etc.
+	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
+	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
+	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
+	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
+	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
+	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
+	 * <li>Value types such as <code>DoubleValue</code>,
+	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
+	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
+	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
+	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
+	 * </ul>
+	 *
+	 * Example:
+	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
+	 *
+	 * @param typeInfoString
+	 *            type information string to be parsed
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(String typeInfoString) {
+		if (typeInfoString == null) {
+			throw new IllegalArgumentException("Type information string must not be null.");
+		}
+		return returns(TypeInfoParser.<T>parse(typeInfoString));
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
+	 * 
+	 * <ul>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
+	 * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
+	 * <li>etc.</li>
+	 * </ul>
+	 *
+	 * @param typeInfo
+	 *            type information as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	public O returns(TypeInformation<T> typeInfo) {
+		if (typeInfo == null) {
+			throw new IllegalArgumentException("Type information must not be null.");
+		}
+		transformation.setOutputType(typeInfo);
+		@SuppressWarnings("unchecked")
+		O returnType = (O) this;
+		return returnType;
+	}
+	
+	/**
+	 * Adds a type information hint about the return type of this operator. 
+	 * 
+	 * <p>
+	 * Type hints are important in cases where the Java compiler
+	 * throws away generic type information necessary for efficient execution.
+	 * 
+	 * <p>
+	 * This method takes a class that will be analyzed by Flink's type extraction capabilities.
+	 * 
+	 * <p>
+	 * Examples for classes are:
+	 * <ul>
+	 * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
+	 * <li>POJOs such as <code>MyPojo.class</code></li>
+	 * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
+	 * <li>Arrays such as <code>String[].class</code>, etc.</li>
+	 * </ul>
+	 *
+	 * @param typeClass
+	 *            class as a return type hint
+	 * @return This operator with a given return type hint.
+	 */
+	@SuppressWarnings("unchecked")
+	public O returns(Class<T> typeClass) {
+		if (typeClass == null) {
+			throw new IllegalArgumentException("Type class must not be null.");
+		}
+		
+		try {
+			TypeInformation<T> ti = (TypeInformation<T>) TypeExtractor.createTypeInfo(typeClass);
+			return returns(ti);
+		}
+		catch (InvalidTypesException e) {
+			throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
+		}
+	}
+
+	@Override
+	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+		return new SingleOutputStreamOperator<T, O>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
+	}
+
+	/**
+	 * By default all operators in a streaming job share the same resource
+	 * group. Each resource group takes as many task manager slots as the
+	 * maximum parallelism operator in that group. Task chaining is only
+	 * possible within one resource group. By calling this method, this
+	 * operators starts a new resource group and all subsequent operators will
+	 * be added to this group unless specified otherwise. </p> Please note that
+	 * local executions have by default as many available task slots as the
+	 * environment parallelism, so in order to start a new resource group the
+	 * degree of parallelism for the operators must be decreased from the
+	 * default.
+	 * 
+	 * @return The operator as a part of a new resource group.
+	 */
+	public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
+		transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
+		return this;
+	}
+
+	/**
+	 * Isolates the operator in its own resource group. This will cause the
+	 * operator to grab as many task slots as its degree of parallelism. If
+	 * there are no free resources available, the job will fail to start. It
+	 * also disables chaining for this operator </p>All subsequent operators are
+	 * assigned to the default resource group.
+	 * 
+	 * @return The operator with isolated resource group.
+	 */
+	public SingleOutputStreamOperator<T, O> isolateResources() {
+		transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
+		return this;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
new file mode 100644
index 0000000..11ee7f2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SplitTransformation;
+
+/**
+ * The SplitStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function. To apply transformation on the whole output simply
+ * call the transformation on the SplitStream
+ *
+ * @param <OUT> The type of the elements in the Stream
+ */
+public class SplitStream<OUT> extends DataStream<OUT> {
+
+	protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
+		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
+	}
+
+	/**
+	 * Sets the output names for which the next operator will receive values.
+	 * 
+	 * @param outputNames
+	 *            The output names for which the operator will receive the
+	 *            input.
+	 * @return Returns the selected DataStream
+	 */
+	public DataStream<OUT> select(String... outputNames) {
+		return selectOutput(outputNames);
+	}
+
+	private DataStream<OUT> selectOutput(String[] outputNames) {
+		for (String outName : outputNames) {
+			if (outName == null) {
+				throw new RuntimeException("Selected names must not be null");
+			}
+		}
+
+		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
+		return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
new file mode 100644
index 0000000..149d7a8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.operators.StreamProject;
+
+import com.google.common.base.Preconditions;
+
+public class StreamProjection<IN> {
+
+	private DataStream<IN> dataStream;
+	private int[] fieldIndexes;
+
+	protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
+		if (!dataStream.getType().isTupleType()) {
+			throw new RuntimeException("Only Tuple DataStreams can be projected");
+		}
+		if(fieldIndexes.length == 0) {
+			throw new IllegalArgumentException("project() needs to select at least one (1) field.");
+		} else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
+			throw new IllegalArgumentException(
+					"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
+		}
+
+		int maxFieldIndex = (dataStream.getType()).getArity();
+		for(int i = 0; i < fieldIndexes.length; i++) {
+			Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
+		}
+
+		this.dataStream = dataStream;
+		this.fieldIndexes = fieldIndexes;
+	}
+
+	/**
+	 * Chooses a projectTupleX according to the length of
+	 * {@link org.apache.flink.streaming.api.datastream.StreamProjection#fieldIndexes}
+	 *
+	 * @return The projected DataStream.
+	 * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
+	 */
+	@SuppressWarnings("unchecked")
+	public <OUT extends Tuple> SingleOutputStreamOperator<OUT, ?> projectTupleX() {
+		SingleOutputStreamOperator<OUT, ?> projOperator = null;
+
+		switch (fieldIndexes.length) {
+			case 1: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple1(); break;
+			case 2: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple2(); break;
+			case 3: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple3(); break;
+			case 4: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple4(); break;
+			case 5: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple5(); break;
+			case 6: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple6(); break;
+			case 7: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple7(); break;
+			case 8: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple8(); break;
+			case 9: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple9(); break;
+			case 10: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple10(); break;
+			case 11: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple11(); break;
+			case 12: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple12(); break;
+			case 13: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple13(); break;
+			case 14: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple14(); break;
+			case 15: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple15(); break;
+			case 16: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple16(); break;
+			case 17: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple17(); break;
+			case 18: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple18(); break;
+			case 19: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple19(); break;
+			case 20: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple20(); break;
+			case 21: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple21(); break;
+			case 22: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple22(); break;
+			case 23: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple23(); break;
+			case 24: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple24(); break;
+			case 25: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple25(); break;
+			default:
+				throw new IllegalStateException("Excessive arity in tuple.");
+		}
+
+		return projOperator;
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0> SingleOutputStreamOperator<Tuple1<T0>, ?> projectTuple1() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
+				fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1> SingleOutputStreamOperator<Tuple2<T0, T1>, ?> projectTuple2() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2> SingleOutputStreamOperator<Tuple3<T0, T1, T2>, ?> projectTuple3() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3> SingleOutputStreamOperator<Tuple4<T0, T1, T2, T3>, ?> projectTuple4() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4> SingleOutputStreamOperator<Tuple5<T0, T1, T2, T3, T4>, ?> projectTuple5() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5> SingleOutputStreamOperator<Tuple6<T0, T1, T2, T3, T4, T5>, ?> projectTuple6() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6> SingleOutputStreamOperator<Tuple7<T0, T1, T2, T3, T4, T5, T6>, ?> projectTuple7() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7> SingleOutputStreamOperator<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>, ?> projectTuple8() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8> SingleOutputStreamOperator<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>, ?> projectTuple9() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> SingleOutputStreamOperator<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>, ?> projectTuple10() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> SingleOutputStreamOperator<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>, ?> projectTuple11() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> SingleOutputStreamOperator<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>, ?> projectTuple12() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> SingleOutputStreamOperator<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>, ?> projectTuple13() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> SingleOutputStreamOperator<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>, ?> projectTuple14() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> SingleOutputStreamOperator<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>, ?> projectTuple15() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> SingleOutputStreamOperator<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>, ?> projectTuple16() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> SingleOutputStreamOperator<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>, ?> projectTuple17() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> SingleOutputStreamOperator<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>, ?> projectTuple18() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> SingleOutputStreamOperator<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>, ?> projectTuple19() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> SingleOutputStreamOperator<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>, ?> projectTuple20() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> SingleOutputStreamOperator<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>, ?> projectTuple21() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> SingleOutputStreamOperator<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>, ?> projectTuple22() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> SingleOutputStreamOperator<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>, ?> projectTuple23() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> SingleOutputStreamOperator<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>, ?> projectTuple24() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	/**
+	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+	 *
+	 * @return The projected DataStream.
+	 * @see Tuple
+	 * @see DataStream
+	 */
+	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> SingleOutputStreamOperator<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>, ?> projectTuple25() {
+		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
+
+		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+	}
+
+	public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
+
+		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
+		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+
+		for (int i = 0; i < fields.length; i++) {
+			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+		}
+
+		return fieldTypes;
+	}
+
+}