You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:58 UTC

[42/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
deleted file mode 100644
index d4a3a77..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamGroupedFold;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
- * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
- * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
- * partitioning methods such as shuffle, forward and keyBy.
- *
- * <p>
- * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
- * that have the same key.
- *
- * @param <T> The type of the elements in the Keyed Stream.
- * @param <KEY> The type of the key in the Keyed Stream.
- */
-public class KeyedStream<T, KEY> extends DataStream<T> {
-
-	/** The key selector that can get the key by which the stream if partitioned from the elements */
-	private final KeySelector<T, KEY> keySelector;
-
-	/** The type of the key by which the stream is partitioned */
-	private final TypeInformation<KEY> keyType;
-	
-	/**
-	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
-	 * to partition operator state by key.
-	 * 
-	 * @param dataStream
-	 *            Base stream of data
-	 * @param keySelector
-	 *            Function for determining state partitions
-	 */
-	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
-		this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
-	}
-
-	/**
-	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
-	 * to partition operator state by key.
-	 *
-	 * @param dataStream
-	 *            Base stream of data
-	 * @param keySelector
-	 *            Function for determining state partitions
-	 */
-	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
-		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
-				dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
-		this.keySelector = keySelector;
-		this.keyType = keyType;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the key selector that can get the key by which the stream if partitioned from the elements.
-	 * @return The key selector for the key.
-	 */
-	public KeySelector<T, KEY> getKeySelector() {
-		return this.keySelector;
-	}
-
-	/**
-	 * Gets the type of the key by which the stream is partitioned. 
-	 * @return The type of the key by which the stream is partitioned.
-	 */
-	public TypeInformation<KEY> getKeyType() {
-		return keyType;
-	}
-
-	@Override
-	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
-		throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
-	}
-
-	// ------------------------------------------------------------------------
-	//  basic transformations
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
-			TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
-
-		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
-
-		// inject the key selector and key type
-		OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
-		transform.setStateKeySelector(keySelector);
-		transform.setStateKeyType(keyType);
-		
-		return returnStream;
-	}
-	
-	@Override
-	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
-		DataStreamSink<T> result = super.addSink(sinkFunction);
-		result.getTransformation().setStateKeySelector(keySelector);
-		result.getTransformation().setStateKeyType(keyType);
-		return result;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Windowing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Windows this {@code KeyedStream} into tumbling time windows.
-	 *
-	 * <p>
-	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
-	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
-	 * set using
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
-	 *
-	 * @param size The size of the window.
-	 */
-	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
-		return window(TumblingTimeWindows.of(size));
-	}
-
-	/**
-	 * Windows this {@code KeyedStream} into sliding time windows.
-	 *
-	 * <p>
-	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
-	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
-	 * set using
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
-	 *
-	 * @param size The size of the window.
-	 */
-	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
-		return window(SlidingTimeWindows.of(size, slide));
-	}
-
-	/**
-	 * Windows this {@code KeyedStream} into tumbling count windows.
-	 *
-	 * @param size The size of the windows in number of elements.
-	 */
-	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
-		return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
-	}
-
-	/**
-	 * Windows this {@code KeyedStream} into sliding count windows.
-	 *
-	 * @param size The size of the windows in number of elements.
-	 * @param slide The slide interval in number of elements.
-	 */
-	public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
-		return window(GlobalWindows.create())
-				.evictor(CountEvictor.of(size))
-				.trigger(CountTrigger.of(slide));
-	}
-
-	/**
-	 * Windows this data stream to a {@code WindowedStream}, which evaluates windows
-	 * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
-	 * grouping of elements is done both by key and by window.
-	 *
-	 * <p>
-	 * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
-	 * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
-	 * that is used if a {@code Trigger} is not specified.
-	 *
-	 * @param assigner The {@code WindowAssigner} that assigns elements to windows.
-	 * @return The trigger windows data stream.
-	 */
-	public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
-		return new WindowedStream<>(this, assigner);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Non-Windowed aggregation operations
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies a reduce transformation on the grouped data stream grouped on by
-	 * the given key position. The {@link ReduceFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same reducer.
-	 *
-	 * @param reducer
-	 *            The {@link ReduceFunction} that will be called for every
-	 *            element of the input values with the same key.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
-		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
-				clean(reducer), getType().createSerializer(getExecutionConfig())));
-	}
-
-	/**
-	 * Applies a fold transformation on the grouped data stream grouped on by
-	 * the given key position. The {@link FoldFunction} will receive input
-	 * values based on the key value. Only input values with the same key will
-	 * go to the same folder.
-	 *
-	 * @param folder
-	 *            The {@link FoldFunction} that will be called for every element
-	 *            of the input values with the same key.
-	 * @param initialValue
-	 *            The initialValue passed to the folders for each key.
-	 * @return The transformed DataStream.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
-
-		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
-				clean(folder), getType(), Utils.getCallLocationName(), true);
-
-		return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
-	}
-
-	/**
-	 * Applies an aggregation that gives a rolling sum of the data stream at the
-	 * given position grouped by the given key. An independent aggregate is kept
-	 * per key.
-	 *
-	 * @param positionToSum
-	 *            The position in the data point to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
-		return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current sum of the pojo data
-	 * stream at the given field expressionby the given key. An independent
-	 * aggregate is kept per key. A field expression is either the name of a
-	 * public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(String field) {
-		return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current minimum of the data
-	 * stream at the given position by the given key. An independent aggregate
-	 * is kept per key.
-	 *
-	 * @param positionToMin
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current minimum of the pojo
-	 * data stream at the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(String field) {
-		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
-				false, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the current maximum of the data stream
-	 * at the given position by the given key. An independent aggregate is kept
-	 * per key.
-	 *
-	 * @param positionToMax
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current maximum of the pojo
-	 * data stream at the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(String field) {
-		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
-				false, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current minimum element of the
-	 * pojo data stream by the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
-				first, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current maximum element of the
-	 * pojo data stream by the given field expression by the given key. An
-	 * independent aggregate is kept per key. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
-				first, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * minimum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the minimum value at the
-	 * given position, the operator returns the first one by default.
-	 *
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * minimum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the minimum value at the
-	 * given position, the operator returns the first one by default.
-	 *
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * minimum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the minimum value at the
-	 * given position, the operator returns either the first or last one,
-	 * depending on the parameter set.
-	 *
-	 * @param positionToMinBy
-	 *            The position in the data point to minimize
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            minimal value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the maximum value at the
-	 * given position, the operator returns the first one by default.
-	 *
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the maximum value at the
-	 * given position, the operator returns the first one by default.
-	 *
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the current element with the
-	 * maximum value at the given position by the given key. An independent
-	 * aggregate is kept per key. If more elements have the maximum value at the
-	 * given position, the operator returns either the first or last one,
-	 * depending on the parameter set.
-	 *
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize.
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
-				getExecutionConfig()));
-	}
-
-	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
-		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
-				clean(aggregate), getType().createSerializer(getExecutionConfig()));
-		return transform("Keyed Aggregation", getType(), operator);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
deleted file mode 100644
index 33d5a3c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * The SingleOutputStreamOperator represents a user defined transformation
- * applied on a {@link DataStream} with one predefined output type.
- *
- * @param <T> The type of the elements in this Stream
- * @param <O> Type of the operator.
- */
-public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
-
-	protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
-		super(environment, transformation);
-	}
-
-	/**
-	 * Gets the name of the current data stream. This name is
-	 * used by the visualization and logging during runtime.
-	 *
-	 * @return Name of the stream.
-	 */
-	public String getName() {
-		return transformation.getName();
-	}
-
-	/**
-	 * Sets the name of the current data stream. This name is
-	 * used by the visualization and logging during runtime.
-	 *
-	 * @return The named operator.
-	 */
-	public SingleOutputStreamOperator<T, O> name(String name){
-		transformation.setName(name);
-		return this;
-	}
-
-	/**
-	 * Sets the parallelism for this operator. The degree must be 1 or more.
-	 * 
-	 * @param parallelism
-	 *            The parallelism for this operator.
-	 * @return The operator with set parallelism.
-	 */
-	public SingleOutputStreamOperator<T, O> setParallelism(int parallelism) {
-		if (parallelism < 1) {
-			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
-		}
-
-		transformation.setParallelism(parallelism);
-
-		return this;
-	}
-
-	/**
-	 * Sets the maximum time frequency (ms) for the flushing of the output
-	 * buffer. By default the output buffers flush only when they are full.
-	 * 
-	 * @param timeoutMillis
-	 *            The maximum time between two output flushes.
-	 * @return The operator with buffer timeout set.
-	 */
-	public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis) {
-		transformation.setBufferTimeout(timeoutMillis);
-		return this;
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<T, O> broadcast() {
-		return (SingleOutputStreamOperator<T, O>) super.broadcast();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<T, O> shuffle() {
-		return (SingleOutputStreamOperator<T, O>) super.shuffle();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<T, O> forward() {
-		return (SingleOutputStreamOperator<T, O>) super.forward();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<T, O> rebalance() {
-		return (SingleOutputStreamOperator<T, O>) super.rebalance();
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<T, O> global() {
-		return (SingleOutputStreamOperator<T, O>) super.global();
-	}
-
-	/**
-	 * Sets the {@link ChainingStrategy} for the given operator affecting the
-	 * way operators will possibly be co-located on the same thread for
-	 * increased performance.
-	 * 
-	 * @param strategy
-	 *            The selected {@link ChainingStrategy}
-	 * @return The operator with the modified chaining strategy
-	 */
-	private SingleOutputStreamOperator<T, O> setChainingStrategy(ChainingStrategy strategy) {
-		this.transformation.setChainingStrategy(strategy);
-		return this;
-	}
-
-	/**
-	 * Turns off chaining for this operator so thread co-location will not be
-	 * used as an optimization. </p> Chaining can be turned off for the whole
-	 * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
-	 * however it is not advised for performance considerations.
-	 * 
-	 * @return The operator with chaining disabled
-	 */
-	public SingleOutputStreamOperator<T, O> disableChaining() {
-		return setChainingStrategy(ChainingStrategy.NEVER);
-	}
-
-	/**
-	 * Starts a new task chain beginning at this operator. This operator will
-	 * not be chained (thread co-located for increased performance) to any
-	 * previous tasks even if possible.
-	 * 
-	 * @return The operator with chaining set.
-	 */
-	public SingleOutputStreamOperator<T, O> startNewChain() {
-		return setChainingStrategy(ChainingStrategy.HEAD);
-	}
-
-	/**
-	 * Adds a type information hint about the return type of this operator. 
-	 * 
-	 * <p>
-	 * Type hints are important in cases where the Java compiler
-	 * throws away generic type information necessary for efficient execution.
-	 * 
-	 * <p>
-	 * This method takes a type information string that will be parsed. A type information string can contain the following
-	 * types:
-	 *
-	 * <ul>
-	 * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
-	 * <li>Basic type arrays such as <code>Integer[]</code>,
-	 * <code>String[]</code>, etc.
-	 * <li>Tuple types such as <code>Tuple1&lt;TYPE0&gt;</code>,
-	 * <code>Tuple2&lt;TYPE0, TYPE1&gt;</code>, etc.</li>
-	 * <li>Pojo types such as <code>org.my.MyPojo&lt;myFieldName=TYPE0,myFieldName2=TYPE1&gt;</code>, etc.</li>
-	 * <li>Generic types such as <code>java.lang.Class</code>, etc.
-	 * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
-	 * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
-	 * <li>Value types such as <code>DoubleValue</code>,
-	 * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
-	 * <li>Tuple array types such as <code>Tuple2&lt;TYPE0,TYPE1&gt;[], etc.</code></li>
-	 * <li>Writable types such as <code>Writable&lt;org.my.CustomWritable&gt;</code></li>
-	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
-	 * </ul>
-	 *
-	 * Example:
-	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
-	 *
-	 * @param typeInfoString
-	 *            type information string to be parsed
-	 * @return This operator with a given return type hint.
-	 */
-	public O returns(String typeInfoString) {
-		if (typeInfoString == null) {
-			throw new IllegalArgumentException("Type information string must not be null.");
-		}
-		return returns(TypeInfoParser.<T>parse(typeInfoString));
-	}
-	
-	/**
-	 * Adds a type information hint about the return type of this operator. 
-	 * 
-	 * <p>
-	 * Type hints are important in cases where the Java compiler
-	 * throws away generic type information necessary for efficient execution.
-	 * 
-	 * <p>
-	 * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
-	 * 
-	 * <ul>
-	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
-	 * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
-	 * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
-	 * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
-	 * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
-	 * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
-	 * <li>etc.</li>
-	 * </ul>
-	 *
-	 * @param typeInfo
-	 *            type information as a return type hint
-	 * @return This operator with a given return type hint.
-	 */
-	public O returns(TypeInformation<T> typeInfo) {
-		if (typeInfo == null) {
-			throw new IllegalArgumentException("Type information must not be null.");
-		}
-		transformation.setOutputType(typeInfo);
-		@SuppressWarnings("unchecked")
-		O returnType = (O) this;
-		return returnType;
-	}
-	
-	/**
-	 * Adds a type information hint about the return type of this operator. 
-	 * 
-	 * <p>
-	 * Type hints are important in cases where the Java compiler
-	 * throws away generic type information necessary for efficient execution.
-	 * 
-	 * <p>
-	 * This method takes a class that will be analyzed by Flink's type extraction capabilities.
-	 * 
-	 * <p>
-	 * Examples for classes are:
-	 * <ul>
-	 * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
-	 * <li>POJOs such as <code>MyPojo.class</code></li>
-	 * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
-	 * <li>Arrays such as <code>String[].class</code>, etc.</li>
-	 * </ul>
-	 *
-	 * @param typeClass
-	 *            class as a return type hint
-	 * @return This operator with a given return type hint.
-	 */
-	@SuppressWarnings("unchecked")
-	public O returns(Class<T> typeClass) {
-		if (typeClass == null) {
-			throw new IllegalArgumentException("Type class must not be null.");
-		}
-		
-		try {
-			TypeInformation<T> ti = (TypeInformation<T>) TypeExtractor.createTypeInfo(typeClass);
-			return returns(ti);
-		}
-		catch (InvalidTypesException e) {
-			throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
-		}
-	}
-
-	@Override
-	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
-		return new SingleOutputStreamOperator<T, O>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
-	}
-
-	/**
-	 * By default all operators in a streaming job share the same resource
-	 * group. Each resource group takes as many task manager slots as the
-	 * maximum parallelism operator in that group. Task chaining is only
-	 * possible within one resource group. By calling this method, this
-	 * operators starts a new resource group and all subsequent operators will
-	 * be added to this group unless specified otherwise. </p> Please note that
-	 * local executions have by default as many available task slots as the
-	 * environment parallelism, so in order to start a new resource group the
-	 * degree of parallelism for the operators must be decreased from the
-	 * default.
-	 * 
-	 * @return The operator as a part of a new resource group.
-	 */
-	public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
-		transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
-		return this;
-	}
-
-	/**
-	 * Isolates the operator in its own resource group. This will cause the
-	 * operator to grab as many task slots as its degree of parallelism. If
-	 * there are no free resources available, the job will fail to start. It
-	 * also disables chaining for this operator </p>All subsequent operators are
-	 * assigned to the default resource group.
-	 * 
-	 * @return The operator with isolated resource group.
-	 */
-	public SingleOutputStreamOperator<T, O> isolateResources() {
-		transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
-		return this;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
deleted file mode 100644
index 11ee7f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-
-/**
- * The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply transformation on the whole output simply
- * call the transformation on the SplitStream
- *
- * @param <OUT> The type of the elements in the Stream
- */
-public class SplitStream<OUT> extends DataStream<OUT> {
-
-	protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
-		super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
-	}
-
-	/**
-	 * Sets the output names for which the next operator will receive values.
-	 * 
-	 * @param outputNames
-	 *            The output names for which the operator will receive the
-	 *            input.
-	 * @return Returns the selected DataStream
-	 */
-	public DataStream<OUT> select(String... outputNames) {
-		return selectOutput(outputNames);
-	}
-
-	private DataStream<OUT> selectOutput(String[] outputNames) {
-		for (String outName : outputNames) {
-			if (outName == null) {
-				throw new RuntimeException("Selected names must not be null");
-			}
-		}
-
-		SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
-		return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
deleted file mode 100644
index 149d7a8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.operators.StreamProject;
-
-import com.google.common.base.Preconditions;
-
-public class StreamProjection<IN> {
-
-	private DataStream<IN> dataStream;
-	private int[] fieldIndexes;
-
-	protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
-		if (!dataStream.getType().isTupleType()) {
-			throw new RuntimeException("Only Tuple DataStreams can be projected");
-		}
-		if(fieldIndexes.length == 0) {
-			throw new IllegalArgumentException("project() needs to select at least one (1) field.");
-		} else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
-			throw new IllegalArgumentException(
-					"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
-		}
-
-		int maxFieldIndex = (dataStream.getType()).getArity();
-		for(int i = 0; i < fieldIndexes.length; i++) {
-			Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
-		}
-
-		this.dataStream = dataStream;
-		this.fieldIndexes = fieldIndexes;
-	}
-
-	/**
-	 * Chooses a projectTupleX according to the length of
-	 * {@link org.apache.flink.streaming.api.datastream.StreamProjection#fieldIndexes}
-	 *
-	 * @return The projected DataStream.
-	 * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
-	 */
-	@SuppressWarnings("unchecked")
-	public <OUT extends Tuple> SingleOutputStreamOperator<OUT, ?> projectTupleX() {
-		SingleOutputStreamOperator<OUT, ?> projOperator = null;
-
-		switch (fieldIndexes.length) {
-			case 1: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple1(); break;
-			case 2: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple2(); break;
-			case 3: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple3(); break;
-			case 4: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple4(); break;
-			case 5: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple5(); break;
-			case 6: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple6(); break;
-			case 7: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple7(); break;
-			case 8: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple8(); break;
-			case 9: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple9(); break;
-			case 10: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple10(); break;
-			case 11: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple11(); break;
-			case 12: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple12(); break;
-			case 13: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple13(); break;
-			case 14: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple14(); break;
-			case 15: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple15(); break;
-			case 16: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple16(); break;
-			case 17: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple17(); break;
-			case 18: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple18(); break;
-			case 19: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple19(); break;
-			case 20: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple20(); break;
-			case 21: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple21(); break;
-			case 22: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple22(); break;
-			case 23: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple23(); break;
-			case 24: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple24(); break;
-			case 25: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple25(); break;
-			default:
-				throw new IllegalStateException("Excessive arity in tuple.");
-		}
-
-		return projOperator;
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0> SingleOutputStreamOperator<Tuple1<T0>, ?> projectTuple1() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
-				fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1> SingleOutputStreamOperator<Tuple2<T0, T1>, ?> projectTuple2() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2> SingleOutputStreamOperator<Tuple3<T0, T1, T2>, ?> projectTuple3() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3> SingleOutputStreamOperator<Tuple4<T0, T1, T2, T3>, ?> projectTuple4() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4> SingleOutputStreamOperator<Tuple5<T0, T1, T2, T3, T4>, ?> projectTuple5() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5> SingleOutputStreamOperator<Tuple6<T0, T1, T2, T3, T4, T5>, ?> projectTuple6() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6> SingleOutputStreamOperator<Tuple7<T0, T1, T2, T3, T4, T5, T6>, ?> projectTuple7() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7> SingleOutputStreamOperator<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>, ?> projectTuple8() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8> SingleOutputStreamOperator<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>, ?> projectTuple9() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> SingleOutputStreamOperator<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>, ?> projectTuple10() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> SingleOutputStreamOperator<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>, ?> projectTuple11() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> SingleOutputStreamOperator<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>, ?> projectTuple12() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> SingleOutputStreamOperator<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>, ?> projectTuple13() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> SingleOutputStreamOperator<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>, ?> projectTuple14() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> SingleOutputStreamOperator<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>, ?> projectTuple15() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> SingleOutputStreamOperator<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>, ?> projectTuple16() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> SingleOutputStreamOperator<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>, ?> projectTuple17() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> SingleOutputStreamOperator<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>, ?> projectTuple18() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> SingleOutputStreamOperator<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>, ?> projectTuple19() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> SingleOutputStreamOperator<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>, ?> projectTuple20() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> SingleOutputStreamOperator<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>, ?> projectTuple21() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> SingleOutputStreamOperator<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>, ?> projectTuple22() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> SingleOutputStreamOperator<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>, ?> projectTuple23() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> SingleOutputStreamOperator<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>, ?> projectTuple24() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	/**
-	 * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
-	 *
-	 * @return The projected DataStream.
-	 * @see Tuple
-	 * @see DataStream
-	 */
-	public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> SingleOutputStreamOperator<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>, ?> projectTuple25() {
-		TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
-		TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
-
-		return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
-	}
-
-	public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
-
-		TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
-		TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
-
-		for (int i = 0; i < fields.length; i++) {
-			fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
-		}
-
-		return fieldTypes;
-	}
-
-}