You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:18 UTC
[02/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
new file mode 100644
index 0000000..d4a3a77
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamGroupedFold;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
+ * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
+ * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
+ * partitioning methods such as shuffle, forward and keyBy.
+ *
+ * <p>
+ * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
+ * that have the same key.
+ *
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <KEY> The type of the key in the Keyed Stream.
+ */
+public class KeyedStream<T, KEY> extends DataStream<T> {
+
+ /** The key selector that can get the key by which the stream if partitioned from the elements */
+ private final KeySelector<T, KEY> keySelector;
+
+ /** The type of the key by which the stream is partitioned */
+ private final TypeInformation<KEY> keyType;
+
+ /**
+ * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+ * to partition operator state by key.
+ *
+ * @param dataStream
+ * Base stream of data
+ * @param keySelector
+ * Function for determining state partitions
+ */
+ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
+ this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
+ }
+
+ /**
+ * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+ * to partition operator state by key.
+ *
+ * @param dataStream
+ * Base stream of data
+ * @param keySelector
+ * Function for determining state partitions
+ */
+ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
+ super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
+ dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+ this.keySelector = keySelector;
+ this.keyType = keyType;
+ }
+
+ // ------------------------------------------------------------------------
+ // properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the key selector that can get the key by which the stream if partitioned from the elements.
+ * @return The key selector for the key.
+ */
+ public KeySelector<T, KEY> getKeySelector() {
+ return this.keySelector;
+ }
+
+ /**
+ * Gets the type of the key by which the stream is partitioned.
+ * @return The type of the key by which the stream is partitioned.
+ */
+ public TypeInformation<KEY> getKeyType() {
+ return keyType;
+ }
+
+ @Override
+ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+ throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
+ }
+
+ // ------------------------------------------------------------------------
+ // basic transformations
+ // ------------------------------------------------------------------------
+
+ @Override
+ public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
+ TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
+
+ SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
+
+ // inject the key selector and key type
+ OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
+ transform.setStateKeySelector(keySelector);
+ transform.setStateKeyType(keyType);
+
+ return returnStream;
+ }
+
+ @Override
+ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
+ DataStreamSink<T> result = super.addSink(sinkFunction);
+ result.getTransformation().setStateKeySelector(keySelector);
+ result.getTransformation().setStateKeyType(keyType);
+ return result;
+ }
+
+ // ------------------------------------------------------------------------
+ // Windowing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Windows this {@code KeyedStream} into tumbling time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+ * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
+ */
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+ return window(TumblingTimeWindows.of(size));
+ }
+
+ /**
+ * Windows this {@code KeyedStream} into sliding time windows.
+ *
+ * <p>
+ * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+ * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
+ * set using
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
+ *
+ * @param size The size of the window.
+ */
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
+ return window(SlidingTimeWindows.of(size, slide));
+ }
+
+ /**
+ * Windows this {@code KeyedStream} into tumbling count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ */
+ public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
+ return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
+ }
+
+ /**
+ * Windows this {@code KeyedStream} into sliding count windows.
+ *
+ * @param size The size of the windows in number of elements.
+ * @param slide The slide interval in number of elements.
+ */
+ public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
+ return window(GlobalWindows.create())
+ .evictor(CountEvictor.of(size))
+ .trigger(CountTrigger.of(slide));
+ }
+
+ /**
+ * Windows this data stream to a {@code WindowedStream}, which evaluates windows
+ * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
+ * grouping of elements is done both by key and by window.
+ *
+ * <p>
+ * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
+ * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
+ * that is used if a {@code Trigger} is not specified.
+ *
+ * @param assigner The {@code WindowAssigner} that assigns elements to windows.
+ * @return The trigger windows data stream.
+ */
+ public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
+ return new WindowedStream<>(this, assigner);
+ }
+
+ // ------------------------------------------------------------------------
+ // Non-Windowed aggregation operations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce transformation on the grouped data stream grouped on by
+ * the given key position. The {@link ReduceFunction} will receive input
+ * values based on the key value. Only input values with the same key will
+ * go to the same reducer.
+ *
+ * @param reducer
+ * The {@link ReduceFunction} that will be called for every
+ * element of the input values with the same key.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
+ return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
+ clean(reducer), getType().createSerializer(getExecutionConfig())));
+ }
+
+ /**
+ * Applies a fold transformation on the grouped data stream grouped on by
+ * the given key position. The {@link FoldFunction} will receive input
+ * values based on the key value. Only input values with the same key will
+ * go to the same folder.
+ *
+ * @param folder
+ * The {@link FoldFunction} that will be called for every element
+ * of the input values with the same key.
+ * @param initialValue
+ * The initialValue passed to the folders for each key.
+ * @return The transformed DataStream.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
+
+ TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
+ clean(folder), getType(), Utils.getCallLocationName(), true);
+
+ return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
+ }
+
+ /**
+ * Applies an aggregation that gives a rolling sum of the data stream at the
+ * given position grouped by the given key. An independent aggregate is kept
+ * per key.
+ *
+ * @param positionToSum
+ * The position in the data point to sum
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+ return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current sum of the pojo data
+ * stream at the given field expressionby the given key. An independent
+ * aggregate is kept per key. A field expression is either the name of a
+ * public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> sum(String field) {
+ return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current minimum of the data
+ * stream at the given position by the given key. An independent aggregate
+ * is kept per key.
+ *
+ * @param positionToMin
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
+ getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current minimum of the pojo
+ * data stream at the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> min(String field) {
+ return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
+ false, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the current maximum of the data stream
+ * at the given position by the given key. An independent aggregate is kept
+ * per key.
+ *
+ * @param positionToMax
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
+ getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current maximum of the pojo
+ * data stream at the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> max(String field) {
+ return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
+ false, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current minimum element of the
+ * pojo data stream by the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @param first
+ * If True then in case of field equality the first object will
+ * be returned
+ * @return The transformed DataStream.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
+ first, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current maximum element of the
+ * pojo data stream by the given field expression by the given key. An
+ * independent aggregate is kept per key. A field expression is either the
+ * name of a public field or a getter method with parentheses of the
+ * {@link DataStream}S underlying type. A dot can be used to drill down into
+ * objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field
+ * The field expression based on which the aggregation will be
+ * applied.
+ * @param first
+ * If True then in case of field equality the first object will
+ * be returned
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
+ first, getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * minimum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the minimum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * minimum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the minimum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * minimum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the minimum value at the
+ * given position, the operator returns either the first or last one,
+ * depending on the parameter set.
+ *
+ * @param positionToMinBy
+ * The position in the data point to minimize
+ * @param first
+ * If true, then the operator return the first element with the
+ * minimal value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
+ getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * maximum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the maximum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * maximum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the maximum value at the
+ * given position, the operator returns the first one by default.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that that gives the current element with the
+ * maximum value at the given position by the given key. An independent
+ * aggregate is kept per key. If more elements have the maximum value at the
+ * given position, the operator returns either the first or last one,
+ * depending on the parameter set.
+ *
+ * @param positionToMaxBy
+ * The position in the data point to maximize.
+ * @param first
+ * If true, then the operator return the first element with the
+ * maximum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
+ getExecutionConfig()));
+ }
+
+ protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
+ StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
+ clean(aggregate), getType().createSerializer(getExecutionConfig()));
+ return transform("Keyed Aggregation", getType(), operator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
new file mode 100644
index 0000000..33d5a3c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * The SingleOutputStreamOperator represents a user defined transformation
+ * applied on a {@link DataStream} with one predefined output type.
+ *
+ * @param <T> The type of the elements in this Stream
+ * @param <O> Type of the operator.
+ */
+public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
+
+ protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
+ super(environment, transformation);
+ }
+
+ /**
+ * Gets the name of the current data stream. This name is
+ * used by the visualization and logging during runtime.
+ *
+ * @return Name of the stream.
+ */
+ public String getName() {
+ return transformation.getName();
+ }
+
+ /**
+ * Sets the name of the current data stream. This name is
+ * used by the visualization and logging during runtime.
+ *
+ * @return The named operator.
+ */
+ public SingleOutputStreamOperator<T, O> name(String name){
+ transformation.setName(name);
+ return this;
+ }
+
+ /**
+ * Sets the parallelism for this operator. The degree must be 1 or more.
+ *
+ * @param parallelism
+ * The parallelism for this operator.
+ * @return The operator with set parallelism.
+ */
+ public SingleOutputStreamOperator<T, O> setParallelism(int parallelism) {
+ if (parallelism < 1) {
+ throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
+ }
+
+ transformation.setParallelism(parallelism);
+
+ return this;
+ }
+
+ /**
+ * Sets the maximum time frequency (ms) for the flushing of the output
+ * buffer. By default the output buffers flush only when they are full.
+ *
+ * @param timeoutMillis
+ * The maximum time between two output flushes.
+ * @return The operator with buffer timeout set.
+ */
+ public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis) {
+ transformation.setBufferTimeout(timeoutMillis);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<T, O> broadcast() {
+ return (SingleOutputStreamOperator<T, O>) super.broadcast();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<T, O> shuffle() {
+ return (SingleOutputStreamOperator<T, O>) super.shuffle();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<T, O> forward() {
+ return (SingleOutputStreamOperator<T, O>) super.forward();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<T, O> rebalance() {
+ return (SingleOutputStreamOperator<T, O>) super.rebalance();
+ }
+
+ @SuppressWarnings("unchecked")
+ public SingleOutputStreamOperator<T, O> global() {
+ return (SingleOutputStreamOperator<T, O>) super.global();
+ }
+
+ /**
+ * Sets the {@link ChainingStrategy} for the given operator affecting the
+ * way operators will possibly be co-located on the same thread for
+ * increased performance.
+ *
+ * @param strategy
+ * The selected {@link ChainingStrategy}
+ * @return The operator with the modified chaining strategy
+ */
+ private SingleOutputStreamOperator<T, O> setChainingStrategy(ChainingStrategy strategy) {
+ this.transformation.setChainingStrategy(strategy);
+ return this;
+ }
+
+ /**
+ * Turns off chaining for this operator so thread co-location will not be
+ * used as an optimization. </p> Chaining can be turned off for the whole
+ * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
+ * however it is not advised for performance considerations.
+ *
+ * @return The operator with chaining disabled
+ */
+ public SingleOutputStreamOperator<T, O> disableChaining() {
+ return setChainingStrategy(ChainingStrategy.NEVER);
+ }
+
+ /**
+ * Starts a new task chain beginning at this operator. This operator will
+ * not be chained (thread co-located for increased performance) to any
+ * previous tasks even if possible.
+ *
+ * @return The operator with chaining set.
+ */
+ public SingleOutputStreamOperator<T, O> startNewChain() {
+ return setChainingStrategy(ChainingStrategy.HEAD);
+ }
+
+ /**
+ * Adds a type information hint about the return type of this operator.
+ *
+ * <p>
+ * Type hints are important in cases where the Java compiler
+ * throws away generic type information necessary for efficient execution.
+ *
+ * <p>
+ * This method takes a type information string that will be parsed. A type information string can contain the following
+ * types:
+ *
+ * <ul>
+ * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
+ * <li>Basic type arrays such as <code>Integer[]</code>,
+ * <code>String[]</code>, etc.
+ * <li>Tuple types such as <code>Tuple1<TYPE0></code>,
+ * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li>
+ * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li>
+ * <li>Generic types such as <code>java.lang.Class</code>, etc.
+ * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
+ * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
+ * <li>Value types such as <code>DoubleValue</code>,
+ * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
+ * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li>
+ * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li>
+ * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li>
+ * </ul>
+ *
+ * Example:
+ * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code>
+ *
+ * @param typeInfoString
+ * type information string to be parsed
+ * @return This operator with a given return type hint.
+ */
+ public O returns(String typeInfoString) {
+ if (typeInfoString == null) {
+ throw new IllegalArgumentException("Type information string must not be null.");
+ }
+ return returns(TypeInfoParser.<T>parse(typeInfoString));
+ }
+
+ /**
+ * Adds a type information hint about the return type of this operator.
+ *
+ * <p>
+ * Type hints are important in cases where the Java compiler
+ * throws away generic type information necessary for efficient execution.
+ *
+ * <p>
+ * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
+ *
+ * <ul>
+ * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
+ * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
+ * <li>etc.</li>
+ * </ul>
+ *
+ * @param typeInfo
+ * type information as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ public O returns(TypeInformation<T> typeInfo) {
+ if (typeInfo == null) {
+ throw new IllegalArgumentException("Type information must not be null.");
+ }
+ transformation.setOutputType(typeInfo);
+ @SuppressWarnings("unchecked")
+ O returnType = (O) this;
+ return returnType;
+ }
+
+ /**
+ * Adds a type information hint about the return type of this operator.
+ *
+ * <p>
+ * Type hints are important in cases where the Java compiler
+ * throws away generic type information necessary for efficient execution.
+ *
+ * <p>
+ * This method takes a class that will be analyzed by Flink's type extraction capabilities.
+ *
+ * <p>
+ * Examples for classes are:
+ * <ul>
+ * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
+ * <li>POJOs such as <code>MyPojo.class</code></li>
+ * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
+ * <li>Arrays such as <code>String[].class</code>, etc.</li>
+ * </ul>
+ *
+ * @param typeClass
+ * class as a return type hint
+ * @return This operator with a given return type hint.
+ */
+ @SuppressWarnings("unchecked")
+ public O returns(Class<T> typeClass) {
+ if (typeClass == null) {
+ throw new IllegalArgumentException("Type class must not be null.");
+ }
+
+ try {
+ TypeInformation<T> ti = (TypeInformation<T>) TypeExtractor.createTypeInfo(typeClass);
+ return returns(ti);
+ }
+ catch (InvalidTypesException e) {
+ throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
+ }
+ }
+
+ @Override
+ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+ return new SingleOutputStreamOperator<T, O>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
+ }
+
+ /**
+ * By default all operators in a streaming job share the same resource
+ * group. Each resource group takes as many task manager slots as the
+ * maximum parallelism operator in that group. Task chaining is only
+ * possible within one resource group. By calling this method, this
+ * operators starts a new resource group and all subsequent operators will
+ * be added to this group unless specified otherwise. </p> Please note that
+ * local executions have by default as many available task slots as the
+ * environment parallelism, so in order to start a new resource group the
+ * degree of parallelism for the operators must be decreased from the
+ * default.
+ *
+ * @return The operator as a part of a new resource group.
+ */
+ public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
+ transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
+ return this;
+ }
+
+ /**
+ * Isolates the operator in its own resource group. This will cause the
+ * operator to grab as many task slots as its degree of parallelism. If
+ * there are no free resources available, the job will fail to start. It
+ * also disables chaining for this operator </p>All subsequent operators are
+ * assigned to the default resource group.
+ *
+ * @return The operator with isolated resource group.
+ */
+ public SingleOutputStreamOperator<T, O> isolateResources() {
+ transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
new file mode 100644
index 0000000..11ee7f2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.transformations.SelectTransformation;
+import org.apache.flink.streaming.api.transformations.SplitTransformation;
+
+/**
+ * The SplitStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function. To apply transformation on the whole output simply
+ * call the transformation on the SplitStream
+ *
+ * @param <OUT> The type of the elements in the Stream
+ */
+public class SplitStream<OUT> extends DataStream<OUT> {
+
+ protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
+ super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
+ }
+
+ /**
+ * Sets the output names for which the next operator will receive values.
+ *
+ * @param outputNames
+ * The output names for which the operator will receive the
+ * input.
+ * @return Returns the selected DataStream
+ */
+ public DataStream<OUT> select(String... outputNames) {
+ return selectOutput(outputNames);
+ }
+
+ private DataStream<OUT> selectOutput(String[] outputNames) {
+ for (String outName : outputNames) {
+ if (outName == null) {
+ throw new RuntimeException("Selected names must not be null");
+ }
+ }
+
+ SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
+ return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
new file mode 100644
index 0000000..149d7a8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.operators.StreamProject;
+
+import com.google.common.base.Preconditions;
+
+public class StreamProjection<IN> {
+
+ private DataStream<IN> dataStream;
+ private int[] fieldIndexes;
+
+ protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
+ if (!dataStream.getType().isTupleType()) {
+ throw new RuntimeException("Only Tuple DataStreams can be projected");
+ }
+ if(fieldIndexes.length == 0) {
+ throw new IllegalArgumentException("project() needs to select at least one (1) field.");
+ } else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
+ throw new IllegalArgumentException(
+ "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
+ }
+
+ int maxFieldIndex = (dataStream.getType()).getArity();
+ for(int i = 0; i < fieldIndexes.length; i++) {
+ Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
+ }
+
+ this.dataStream = dataStream;
+ this.fieldIndexes = fieldIndexes;
+ }
+
+ /**
+ * Chooses a projectTupleX according to the length of
+ * {@link org.apache.flink.streaming.api.datastream.StreamProjection#fieldIndexes}
+ *
+ * @return The projected DataStream.
+ * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
+ */
+ @SuppressWarnings("unchecked")
+ public <OUT extends Tuple> SingleOutputStreamOperator<OUT, ?> projectTupleX() {
+ SingleOutputStreamOperator<OUT, ?> projOperator = null;
+
+ switch (fieldIndexes.length) {
+ case 1: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple1(); break;
+ case 2: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple2(); break;
+ case 3: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple3(); break;
+ case 4: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple4(); break;
+ case 5: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple5(); break;
+ case 6: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple6(); break;
+ case 7: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple7(); break;
+ case 8: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple8(); break;
+ case 9: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple9(); break;
+ case 10: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple10(); break;
+ case 11: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple11(); break;
+ case 12: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple12(); break;
+ case 13: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple13(); break;
+ case 14: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple14(); break;
+ case 15: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple15(); break;
+ case 16: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple16(); break;
+ case 17: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple17(); break;
+ case 18: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple18(); break;
+ case 19: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple19(); break;
+ case 20: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple20(); break;
+ case 21: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple21(); break;
+ case 22: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple22(); break;
+ case 23: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple23(); break;
+ case 24: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple24(); break;
+ case 25: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple25(); break;
+ default:
+ throw new IllegalStateException("Excessive arity in tuple.");
+ }
+
+ return projOperator;
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0> SingleOutputStreamOperator<Tuple1<T0>, ?> projectTuple1() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
+ fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1> SingleOutputStreamOperator<Tuple2<T0, T1>, ?> projectTuple2() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2> SingleOutputStreamOperator<Tuple3<T0, T1, T2>, ?> projectTuple3() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3> SingleOutputStreamOperator<Tuple4<T0, T1, T2, T3>, ?> projectTuple4() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4> SingleOutputStreamOperator<Tuple5<T0, T1, T2, T3, T4>, ?> projectTuple5() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5> SingleOutputStreamOperator<Tuple6<T0, T1, T2, T3, T4, T5>, ?> projectTuple6() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6> SingleOutputStreamOperator<Tuple7<T0, T1, T2, T3, T4, T5, T6>, ?> projectTuple7() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7> SingleOutputStreamOperator<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>, ?> projectTuple8() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8> SingleOutputStreamOperator<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>, ?> projectTuple9() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> SingleOutputStreamOperator<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>, ?> projectTuple10() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> SingleOutputStreamOperator<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>, ?> projectTuple11() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> SingleOutputStreamOperator<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>, ?> projectTuple12() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> SingleOutputStreamOperator<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>, ?> projectTuple13() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> SingleOutputStreamOperator<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>, ?> projectTuple14() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> SingleOutputStreamOperator<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>, ?> projectTuple15() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> SingleOutputStreamOperator<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>, ?> projectTuple16() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> SingleOutputStreamOperator<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>, ?> projectTuple17() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> SingleOutputStreamOperator<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>, ?> projectTuple18() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> SingleOutputStreamOperator<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>, ?> projectTuple19() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> SingleOutputStreamOperator<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>, ?> projectTuple20() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> SingleOutputStreamOperator<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>, ?> projectTuple21() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> SingleOutputStreamOperator<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>, ?> projectTuple22() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> SingleOutputStreamOperator<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>, ?> projectTuple23() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> SingleOutputStreamOperator<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>, ?> projectTuple24() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ /**
+ * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
+ *
+ * @return The projected DataStream.
+ * @see Tuple
+ * @see DataStream
+ */
+ public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> SingleOutputStreamOperator<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>, ?> projectTuple25() {
+ TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
+ TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
+
+ return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
+ }
+
+ public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
+
+ TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
+ TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+
+ for (int i = 0; i < fields.length; i++) {
+ fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+ }
+
+ return fieldTypes;
+ }
+
+}