You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:58 UTC
[42/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
deleted file mode 100644
index d4a3a77..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamGroupedFold;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
- * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
- * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
- * partitioning methods such as shuffle, forward and keyBy.
- *
- * <p>
- * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
- * that have the same key.
- *
- * @param <T> The type of the elements in the Keyed Stream.
- * @param <KEY> The type of the key in the Keyed Stream.
- */
-public class KeyedStream<T, KEY> extends DataStream<T> {
-
- /** The key selector that can get the key by which the stream if partitioned from the elements */
- private final KeySelector<T, KEY> keySelector;
-
- /** The type of the key by which the stream is partitioned */
- private final TypeInformation<KEY> keyType;
-
- /**
- * Creates a new {@link KeyedStream} using the given {@link KeySelector}
- * to partition operator state by key.
- *
- * @param dataStream
- * Base stream of data
- * @param keySelector
- * Function for determining state partitions
- */
- public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
- this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
- }
-
- /**
- * Creates a new {@link KeyedStream} using the given {@link KeySelector}
- * to partition operator state by key.
- *
- * @param dataStream
- * Base stream of data
- * @param keySelector
- * Function for determining state partitions
- */
- public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
- super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
- dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
- this.keySelector = keySelector;
- this.keyType = keyType;
- }
-
- // ------------------------------------------------------------------------
- // properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the key selector that can get the key by which the stream if partitioned from the elements.
- * @return The key selector for the key.
- */
- public KeySelector<T, KEY> getKeySelector() {
- return this.keySelector;
- }
-
- /**
- * Gets the type of the key by which the stream is partitioned.
- * @return The type of the key by which the stream is partitioned.
- */
- public TypeInformation<KEY> getKeyType() {
- return keyType;
- }
-
- @Override
- protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
- throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
- }
-
- // ------------------------------------------------------------------------
- // basic transformations
- // ------------------------------------------------------------------------
-
- @Override
- public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
- TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
-
- SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
-
- // inject the key selector and key type
- OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
- transform.setStateKeySelector(keySelector);
- transform.setStateKeyType(keyType);
-
- return returnStream;
- }
-
- @Override
- public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
- DataStreamSink<T> result = super.addSink(sinkFunction);
- result.getTransformation().setStateKeySelector(keySelector);
- result.getTransformation().setStateKeyType(keyType);
- return result;
- }
-
- // ------------------------------------------------------------------------
- // Windowing
- // ------------------------------------------------------------------------
-
- /**
- * Windows this {@code KeyedStream} into tumbling time windows.
- *
- * <p>
- * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
- * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
- * set using
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
- *
- * @param size The size of the window.
- */
- public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
- return window(TumblingTimeWindows.of(size));
- }
-
- /**
- * Windows this {@code KeyedStream} into sliding time windows.
- *
- * <p>
- * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
- * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
- * set using
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
- *
- * @param size The size of the window.
- */
- public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
- return window(SlidingTimeWindows.of(size, slide));
- }
-
- /**
- * Windows this {@code KeyedStream} into tumbling count windows.
- *
- * @param size The size of the windows in number of elements.
- */
- public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
- return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
- }
-
- /**
- * Windows this {@code KeyedStream} into sliding count windows.
- *
- * @param size The size of the windows in number of elements.
- * @param slide The slide interval in number of elements.
- */
- public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
- return window(GlobalWindows.create())
- .evictor(CountEvictor.of(size))
- .trigger(CountTrigger.of(slide));
- }
-
- /**
- * Windows this data stream to a {@code WindowedStream}, which evaluates windows
- * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
- * grouping of elements is done both by key and by window.
- *
- * <p>
- * A {@link org.apache.flink.streaming.api.windowing.triggers.Trigger} can be defined to specify
- * when windows are evaluated. However, {@code WindowAssigners} have a default {@code Trigger}
- * that is used if a {@code Trigger} is not specified.
- *
- * @param assigner The {@code WindowAssigner} that assigns elements to windows.
- * @return The trigger windows data stream.
- */
- public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
- return new WindowedStream<>(this, assigner);
- }
-
- // ------------------------------------------------------------------------
- // Non-Windowed aggregation operations
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce transformation on the grouped data stream grouped on by
- * the given key position. The {@link ReduceFunction} will receive input
- * values based on the key value. Only input values with the same key will
- * go to the same reducer.
- *
- * @param reducer
- * The {@link ReduceFunction} that will be called for every
- * element of the input values with the same key.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
- return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
- clean(reducer), getType().createSerializer(getExecutionConfig())));
- }
-
- /**
- * Applies a fold transformation on the grouped data stream grouped on by
- * the given key position. The {@link FoldFunction} will receive input
- * values based on the key value. Only input values with the same key will
- * go to the same folder.
- *
- * @param folder
- * The {@link FoldFunction} that will be called for every element
- * of the input values with the same key.
- * @param initialValue
- * The initialValue passed to the folders for each key.
- * @return The transformed DataStream.
- */
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
-
- TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
- clean(folder), getType(), Utils.getCallLocationName(), true);
-
- return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
- }
-
- /**
- * Applies an aggregation that gives a rolling sum of the data stream at the
- * given position grouped by the given key. An independent aggregate is kept
- * per key.
- *
- * @param positionToSum
- * The position in the data point to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
- return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current sum of the pojo data
- * stream at the given field expressionby the given key. An independent
- * aggregate is kept per key. A field expression is either the name of a
- * public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(String field) {
- return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current minimum of the data
- * stream at the given position by the given key. An independent aggregate
- * is kept per key.
- *
- * @param positionToMin
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
- return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current minimum of the pojo
- * data stream at the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(String field) {
- return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
- false, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the current maximum of the data stream
- * at the given position by the given key. An independent aggregate is kept
- * per key.
- *
- * @param positionToMax
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
- return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current maximum of the pojo
- * data stream at the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(String field) {
- return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
- false, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current minimum element of the
- * pojo data stream by the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
- first, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current maximum element of the
- * pojo data stream by the given field expression by the given key. An
- * independent aggregate is kept per key. A field expression is either the
- * name of a public field or a getter method with parentheses of the
- * {@link DataStream}S underlying type. A dot can be used to drill down into
- * objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field
- * The field expression based on which the aggregation will be
- * applied.
- * @param first
- * If True then in case of field equality the first object will
- * be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
- first, getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * minimum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the minimum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * minimum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the minimum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * minimum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the minimum value at the
- * given position, the operator returns either the first or last one,
- * depending on the parameter set.
- *
- * @param positionToMinBy
- * The position in the data point to minimize
- * @param first
- * If true, then the operator return the first element with the
- * minimal value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
- getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * maximum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the maximum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * maximum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the maximum value at the
- * given position, the operator returns the first one by default.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that that gives the current element with the
- * maximum value at the given position by the given key. An independent
- * aggregate is kept per key. If more elements have the maximum value at the
- * given position, the operator returns either the first or last one,
- * depending on the parameter set.
- *
- * @param positionToMaxBy
- * The position in the data point to maximize.
- * @param first
- * If true, then the operator return the first element with the
- * maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
- getExecutionConfig()));
- }
-
- protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
- StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
- clean(aggregate), getType().createSerializer(getExecutionConfig()));
- return transform("Keyed Aggregation", getType(), operator);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
deleted file mode 100644
index 33d5a3c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * The SingleOutputStreamOperator represents a user defined transformation
- * applied on a {@link DataStream} with one predefined output type.
- *
- * @param <T> The type of the elements in this Stream
- * @param <O> Type of the operator.
- */
-public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {
-
- protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
- super(environment, transformation);
- }
-
- /**
- * Gets the name of the current data stream. This name is
- * used by the visualization and logging during runtime.
- *
- * @return Name of the stream.
- */
- public String getName() {
- return transformation.getName();
- }
-
- /**
- * Sets the name of the current data stream. This name is
- * used by the visualization and logging during runtime.
- *
- * @return The named operator.
- */
- public SingleOutputStreamOperator<T, O> name(String name){
- transformation.setName(name);
- return this;
- }
-
- /**
- * Sets the parallelism for this operator. The degree must be 1 or more.
- *
- * @param parallelism
- * The parallelism for this operator.
- * @return The operator with set parallelism.
- */
- public SingleOutputStreamOperator<T, O> setParallelism(int parallelism) {
- if (parallelism < 1) {
- throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
- }
-
- transformation.setParallelism(parallelism);
-
- return this;
- }
-
- /**
- * Sets the maximum time frequency (ms) for the flushing of the output
- * buffer. By default the output buffers flush only when they are full.
- *
- * @param timeoutMillis
- * The maximum time between two output flushes.
- * @return The operator with buffer timeout set.
- */
- public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis) {
- transformation.setBufferTimeout(timeoutMillis);
- return this;
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<T, O> broadcast() {
- return (SingleOutputStreamOperator<T, O>) super.broadcast();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<T, O> shuffle() {
- return (SingleOutputStreamOperator<T, O>) super.shuffle();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<T, O> forward() {
- return (SingleOutputStreamOperator<T, O>) super.forward();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<T, O> rebalance() {
- return (SingleOutputStreamOperator<T, O>) super.rebalance();
- }
-
- @SuppressWarnings("unchecked")
- public SingleOutputStreamOperator<T, O> global() {
- return (SingleOutputStreamOperator<T, O>) super.global();
- }
-
- /**
- * Sets the {@link ChainingStrategy} for the given operator affecting the
- * way operators will possibly be co-located on the same thread for
- * increased performance.
- *
- * @param strategy
- * The selected {@link ChainingStrategy}
- * @return The operator with the modified chaining strategy
- */
- private SingleOutputStreamOperator<T, O> setChainingStrategy(ChainingStrategy strategy) {
- this.transformation.setChainingStrategy(strategy);
- return this;
- }
-
- /**
- * Turns off chaining for this operator so thread co-location will not be
- * used as an optimization. </p> Chaining can be turned off for the whole
- * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
- * however it is not advised for performance considerations.
- *
- * @return The operator with chaining disabled
- */
- public SingleOutputStreamOperator<T, O> disableChaining() {
- return setChainingStrategy(ChainingStrategy.NEVER);
- }
-
- /**
- * Starts a new task chain beginning at this operator. This operator will
- * not be chained (thread co-located for increased performance) to any
- * previous tasks even if possible.
- *
- * @return The operator with chaining set.
- */
- public SingleOutputStreamOperator<T, O> startNewChain() {
- return setChainingStrategy(ChainingStrategy.HEAD);
- }
-
- /**
- * Adds a type information hint about the return type of this operator.
- *
- * <p>
- * Type hints are important in cases where the Java compiler
- * throws away generic type information necessary for efficient execution.
- *
- * <p>
- * This method takes a type information string that will be parsed. A type information string can contain the following
- * types:
- *
- * <ul>
- * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
- * <li>Basic type arrays such as <code>Integer[]</code>,
- * <code>String[]</code>, etc.
- * <li>Tuple types such as <code>Tuple1<TYPE0></code>,
- * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li>
- * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li>
- * <li>Generic types such as <code>java.lang.Class</code>, etc.
- * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
- * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
- * <li>Value types such as <code>DoubleValue</code>,
- * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
- * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li>
- * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li>
- * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li>
- * </ul>
- *
- * Example:
- * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code>
- *
- * @param typeInfoString
- * type information string to be parsed
- * @return This operator with a given return type hint.
- */
- public O returns(String typeInfoString) {
- if (typeInfoString == null) {
- throw new IllegalArgumentException("Type information string must not be null.");
- }
- return returns(TypeInfoParser.<T>parse(typeInfoString));
- }
-
- /**
- * Adds a type information hint about the return type of this operator.
- *
- * <p>
- * Type hints are important in cases where the Java compiler
- * throws away generic type information necessary for efficient execution.
- *
- * <p>
- * This method takes an instance of {@link org.apache.flink.api.common.typeinfo.TypeInformation} such as:
- *
- * <ul>
- * <li>{@link org.apache.flink.api.common.typeinfo.BasicTypeInfo}</li>
- * <li>{@link org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo}</li>
- * <li>{@link org.apache.flink.api.java.typeutils.TupleTypeInfo}</li>
- * <li>{@link org.apache.flink.api.java.typeutils.PojoTypeInfo}</li>
- * <li>{@link org.apache.flink.api.java.typeutils.WritableTypeInfo}</li>
- * <li>{@link org.apache.flink.api.java.typeutils.ValueTypeInfo}</li>
- * <li>etc.</li>
- * </ul>
- *
- * @param typeInfo
- * type information as a return type hint
- * @return This operator with a given return type hint.
- */
- public O returns(TypeInformation<T> typeInfo) {
- if (typeInfo == null) {
- throw new IllegalArgumentException("Type information must not be null.");
- }
- transformation.setOutputType(typeInfo);
- @SuppressWarnings("unchecked")
- O returnType = (O) this;
- return returnType;
- }
-
- /**
- * Adds a type information hint about the return type of this operator.
- *
- * <p>
- * Type hints are important in cases where the Java compiler
- * throws away generic type information necessary for efficient execution.
- *
- * <p>
- * This method takes a class that will be analyzed by Flink's type extraction capabilities.
- *
- * <p>
- * Examples for classes are:
- * <ul>
- * <li>Basic types such as <code>Integer.class</code>, <code>String.class</code>, etc.</li>
- * <li>POJOs such as <code>MyPojo.class</code></li>
- * <li>Classes that <b>extend</b> tuples. Classes like <code>Tuple1.class</code>,<code>Tuple2.class</code>, etc. are <b>not</b> sufficient.</li>
- * <li>Arrays such as <code>String[].class</code>, etc.</li>
- * </ul>
- *
- * @param typeClass
- * class as a return type hint
- * @return This operator with a given return type hint.
- */
- @SuppressWarnings("unchecked")
- public O returns(Class<T> typeClass) {
- if (typeClass == null) {
- throw new IllegalArgumentException("Type class must not be null.");
- }
-
- try {
- TypeInformation<T> ti = (TypeInformation<T>) TypeExtractor.createTypeInfo(typeClass);
- return returns(ti);
- }
- catch (InvalidTypesException e) {
- throw new InvalidTypesException("The given class is not suited for providing necessary type information.", e);
- }
- }
-
- @Override
- protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
- return new SingleOutputStreamOperator<T, O>(this.getExecutionEnvironment(), new PartitionTransformation<T>(this.getTransformation(), partitioner));
- }
-
- /**
- * By default all operators in a streaming job share the same resource
- * group. Each resource group takes as many task manager slots as the
- * maximum parallelism operator in that group. Task chaining is only
- * possible within one resource group. By calling this method, this
- * operators starts a new resource group and all subsequent operators will
- * be added to this group unless specified otherwise. </p> Please note that
- * local executions have by default as many available task slots as the
- * environment parallelism, so in order to start a new resource group the
- * degree of parallelism for the operators must be decreased from the
- * default.
- *
- * @return The operator as a part of a new resource group.
- */
- public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
- transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
- return this;
- }
-
- /**
- * Isolates the operator in its own resource group. This will cause the
- * operator to grab as many task slots as its degree of parallelism. If
- * there are no free resources available, the job will fail to start. It
- * also disables chaining for this operator </p>All subsequent operators are
- * assigned to the default resource group.
- *
- * @return The operator with isolated resource group.
- */
- public SingleOutputStreamOperator<T, O> isolateResources() {
- transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
deleted file mode 100644
index 11ee7f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-
-/**
- * The SplitStream represents an operator that has been split using an
- * {@link OutputSelector}. Named outputs can be selected using the
- * {@link #select} function. To apply transformation on the whole output simply
- * call the transformation on the SplitStream
- *
- * @param <OUT> The type of the elements in the Stream
- */
-public class SplitStream<OUT> extends DataStream<OUT> {
-
- protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
- super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
- }
-
- /**
- * Sets the output names for which the next operator will receive values.
- *
- * @param outputNames
- * The output names for which the operator will receive the
- * input.
- * @return Returns the selected DataStream
- */
- public DataStream<OUT> select(String... outputNames) {
- return selectOutput(outputNames);
- }
-
- private DataStream<OUT> selectOutput(String[] outputNames) {
- for (String outName : outputNames) {
- if (outName == null) {
- throw new RuntimeException("Selected names must not be null");
- }
- }
-
- SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
- return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
deleted file mode 100644
index 149d7a8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.operators.StreamProject;
-
-import com.google.common.base.Preconditions;
-
-public class StreamProjection<IN> {
-
- private DataStream<IN> dataStream;
- private int[] fieldIndexes;
-
- protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
- if (!dataStream.getType().isTupleType()) {
- throw new RuntimeException("Only Tuple DataStreams can be projected");
- }
- if(fieldIndexes.length == 0) {
- throw new IllegalArgumentException("project() needs to select at least one (1) field.");
- } else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
- throw new IllegalArgumentException(
- "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
- }
-
- int maxFieldIndex = (dataStream.getType()).getArity();
- for(int i = 0; i < fieldIndexes.length; i++) {
- Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
- }
-
- this.dataStream = dataStream;
- this.fieldIndexes = fieldIndexes;
- }
-
- /**
- * Chooses a projectTupleX according to the length of
- * {@link org.apache.flink.streaming.api.datastream.StreamProjection#fieldIndexes}
- *
- * @return The projected DataStream.
- * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
- */
- @SuppressWarnings("unchecked")
- public <OUT extends Tuple> SingleOutputStreamOperator<OUT, ?> projectTupleX() {
- SingleOutputStreamOperator<OUT, ?> projOperator = null;
-
- switch (fieldIndexes.length) {
- case 1: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple1(); break;
- case 2: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple2(); break;
- case 3: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple3(); break;
- case 4: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple4(); break;
- case 5: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple5(); break;
- case 6: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple6(); break;
- case 7: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple7(); break;
- case 8: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple8(); break;
- case 9: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple9(); break;
- case 10: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple10(); break;
- case 11: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple11(); break;
- case 12: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple12(); break;
- case 13: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple13(); break;
- case 14: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple14(); break;
- case 15: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple15(); break;
- case 16: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple16(); break;
- case 17: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple17(); break;
- case 18: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple18(); break;
- case 19: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple19(); break;
- case 20: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple20(); break;
- case 21: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple21(); break;
- case 22: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple22(); break;
- case 23: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple23(); break;
- case 24: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple24(); break;
- case 25: projOperator = (SingleOutputStreamOperator<OUT, ?>) projectTuple25(); break;
- default:
- throw new IllegalStateException("Excessive arity in tuple.");
- }
-
- return projOperator;
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0> SingleOutputStreamOperator<Tuple1<T0>, ?> projectTuple1() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple1<T0>>(
- fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1> SingleOutputStreamOperator<Tuple2<T0, T1>, ?> projectTuple2() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple2<T0, T1>> tType = new TupleTypeInfo<Tuple2<T0, T1>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple2<T0, T1>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2> SingleOutputStreamOperator<Tuple3<T0, T1, T2>, ?> projectTuple3() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple3<T0, T1, T2>> tType = new TupleTypeInfo<Tuple3<T0, T1, T2>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple3<T0, T1, T2>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3> SingleOutputStreamOperator<Tuple4<T0, T1, T2, T3>, ?> projectTuple4() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple4<T0, T1, T2, T3>> tType = new TupleTypeInfo<Tuple4<T0, T1, T2, T3>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4> SingleOutputStreamOperator<Tuple5<T0, T1, T2, T3, T4>, ?> projectTuple5() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>> tType = new TupleTypeInfo<Tuple5<T0, T1, T2, T3, T4>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5> SingleOutputStreamOperator<Tuple6<T0, T1, T2, T3, T4, T5>, ?> projectTuple6() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>> tType = new TupleTypeInfo<Tuple6<T0, T1, T2, T3, T4, T5>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6> SingleOutputStreamOperator<Tuple7<T0, T1, T2, T3, T4, T5, T6>, ?> projectTuple7() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tType = new TupleTypeInfo<Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7> SingleOutputStreamOperator<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>, ?> projectTuple8() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tType = new TupleTypeInfo<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8> SingleOutputStreamOperator<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>, ?> projectTuple9() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tType = new TupleTypeInfo<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> SingleOutputStreamOperator<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>, ?> projectTuple10() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tType = new TupleTypeInfo<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> SingleOutputStreamOperator<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>, ?> projectTuple11() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tType = new TupleTypeInfo<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> SingleOutputStreamOperator<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>, ?> projectTuple12() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tType = new TupleTypeInfo<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> SingleOutputStreamOperator<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>, ?> projectTuple13() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tType = new TupleTypeInfo<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> SingleOutputStreamOperator<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>, ?> projectTuple14() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tType = new TupleTypeInfo<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> SingleOutputStreamOperator<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>, ?> projectTuple15() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tType = new TupleTypeInfo<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> SingleOutputStreamOperator<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>, ?> projectTuple16() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tType = new TupleTypeInfo<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> SingleOutputStreamOperator<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>, ?> projectTuple17() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tType = new TupleTypeInfo<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> SingleOutputStreamOperator<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>, ?> projectTuple18() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tType = new TupleTypeInfo<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> SingleOutputStreamOperator<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>, ?> projectTuple19() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tType = new TupleTypeInfo<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> SingleOutputStreamOperator<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>, ?> projectTuple20() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tType = new TupleTypeInfo<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> SingleOutputStreamOperator<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>, ?> projectTuple21() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tType = new TupleTypeInfo<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> SingleOutputStreamOperator<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>, ?> projectTuple22() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tType = new TupleTypeInfo<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> SingleOutputStreamOperator<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>, ?> projectTuple23() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tType = new TupleTypeInfo<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> SingleOutputStreamOperator<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>, ?> projectTuple24() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tType = new TupleTypeInfo<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- /**
- * Projects a {@link Tuple} {@link DataStream} to the previously selected fields.
- *
- * @return The projected DataStream.
- * @see Tuple
- * @see DataStream
- */
- public <T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> SingleOutputStreamOperator<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>, ?> projectTuple25() {
- TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
- TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tType = new TupleTypeInfo<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fTypes);
-
- return dataStream.transform("Projection", tType, new StreamProject<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
- }
-
- public static TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
-
- TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
- TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
-
- for (int i = 0; i < fields.length; i++) {
- fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
- }
-
- return fieldTypes;
- }
-
-}