You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:20 UTC
[04/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
new file mode 100644
index 0000000..f25c995
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+public interface OutputSelectorWrapper<OUT> extends Serializable {
+
+ public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
+
+ public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
new file mode 100644
index 0000000..dca2ede
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.List;
+
+public class OutputSelectorWrapperFactory {
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
+ if (outputSelectors.size() == 0) {
+ return new BroadcastOutputSelectorWrapper();
+ } else {
+ return new DirectedOutputSelectorWrapper(outputSelectors);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
new file mode 100644
index 0000000..7191304
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+
+/**
+ * A {@code AllWindowedStream} represents a data stream where the stream of
+ * elements is split into windows based on a
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
+ * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
+ * used to evict elements from the window after
+ * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
+ * When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ *
+ * <p>
+ * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
+ * the {@code AllWindowedStream} will be collapsed together with the
+ * operation over the window into one single operation.
+ *
+ * @param <T> The type of elements in the stream.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
+ */
+public class AllWindowedStream<T, W extends Window> {
+
+ /** The data stream that is windowed by this stream */
+ private final DataStream<T> input;
+
+ /** The window assigner */
+ private final WindowAssigner<? super T, W> windowAssigner;
+
+ /** The trigger that is used for window evaluation/emission. */
+ private Trigger<? super T, ? super W> trigger;
+
+ /** The evictor that is used for evicting elements before window evaluation. */
+ private Evictor<? super T, ? super W> evictor;
+
+
+ public AllWindowedStream(DataStream<T> input,
+ WindowAssigner<? super T, W> windowAssigner) {
+ this.input = input;
+ this.windowAssigner = windowAssigner;
+ this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
+ this.trigger = trigger;
+ return this;
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
+ this.evictor = evictor;
+ return this;
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Operations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+ * so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "Reduce at " + callLocation;
+
+ SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
+ if (result != null) {
+ return result;
+ }
+
+ String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+ OneInputStreamOperator<T, T> operator;
+
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+ if (evictor != null) {
+ operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ new HeapWindowBuffer.Factory<T>(),
+ new ReduceAllWindowFunction<W, T>(function),
+ trigger,
+ evictor).enableSetProcessingTime(setProcessingTime);
+
+ } else {
+ // we need to copy because we need our own instance of the pre aggregator
+ @SuppressWarnings("unchecked")
+ ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
+
+ operator = new NonKeyedWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new ReduceAllWindowFunction<W, T>(function),
+ trigger).enableSetProcessingTime(setProcessingTime);
+ }
+
+ return input.transform(opName, input.getType(), operator).setParallelism(1);
+ }
+
+ /**
+ * Applies the given fold function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the reduce function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * @param function The fold function.
+ * @return The data stream that is the result of applying the fold function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
+ TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
+ Utils.getCallLocationName(), true);
+
+ return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
+ }
+
+ /**
+ * Applies the given fold function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the reduce function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * @param function The fold function.
+ * @return The data stream that is the result of applying the fold function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+ return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
+ }
+
+ /**
+ * Applies a window function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the window function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
+ TypeInformation<T> inType = input.getType();
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, AllWindowFunction.class, true, true, inType, null, false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the window function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ //clean the closure
+ function = input.getExecutionEnvironment().clean(function);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "WindowApply at " + callLocation;
+
+ SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
+ if (result != null) {
+ return result;
+ }
+
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+ NonKeyedWindowOperator<T, R, W> operator;
+
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+ if (evictor != null) {
+ operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger,
+ evictor).enableSetProcessingTime(setProcessingTime);
+
+ } else {
+ operator = new NonKeyedWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger).enableSetProcessingTime(setProcessingTime);
+ }
+
+ return input.transform(opName, resultType, operator).setParallelism(1);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+
+ public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
+ TypeInformation<T> inType = input.getType();
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, AllWindowFunction.class, true, true, inType, null, false);
+
+ return apply(preAggregator, function, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param function The window function.
+ * @param resultType Type information for the result type of the window function
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ //clean the closures
+ function = input.getExecutionEnvironment().clean(function);
+ preAggregator = input.getExecutionEnvironment().clean(preAggregator);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "WindowApply at " + callLocation;
+
+ String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
+
+ OneInputStreamOperator<T, R> operator;
+
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+ if (evictor != null) {
+ operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ new HeapWindowBuffer.Factory<T>(),
+ function,
+ trigger,
+ evictor).enableSetProcessingTime(setProcessingTime);
+
+ } else {
+ operator = new NonKeyedWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
+ function,
+ trigger).enableSetProcessingTime(setProcessingTime);
+ }
+
+ return input.transform(opName, resultType, operator).setParallelism(1);
+ }
+
+ // ------------------------------------------------------------------------
+ // Aggregations on the windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies an aggregation that sums every window of the data stream at the
+ * given position.
+ *
+ * @param positionToSum The position in the tuple/array to sum
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
+ return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that sums every window of the pojo data stream at
+ * the given field for every window.
+ *
+ * <p>
+ * A field expression is either
+ * the name of a public field or a getter method with parentheses of the
+ * stream's underlying type. A dot can be used to drill down into objects,
+ * as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field to sum
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> sum(String field) {
+ return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum value of every window
+ * of the data stream at the given position.
+ *
+ * @param positionToMin The position to minimize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
+ return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum value of the pojo data
+ * stream at the given field expression for every window.
+ *
+ * <p>
+ * A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> min(String field) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns the first element by default.
+ *
+ * @param positionToMinBy
+ * The position to minimize by
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns the first element by default.
+ *
+ * @param positionToMinBy The position to minimize by
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
+ return this.minBy(positionToMinBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the minimum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * minimum value the operator returns either the first or last one depending
+ * on the parameter setting.
+ *
+ * @param positionToMinBy The position to minimize
+ * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the minimum element of the pojo
+ * data stream by the given field expression for every window. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @param first If True then in case of field equality the first object will be returned
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum value of every window of
+ * the data stream at the given position.
+ *
+ * @param positionToMax The position to maximize
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
+ return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum value of the pojo data
+ * stream at the given field expression for every window. A field expression
+ * is either the name of a public field or a getter method with parentheses
+ * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
+ * down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> max(String field) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position to maximize by
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns the first by default.
+ *
+ * @param positionToMaxBy
+ * The position to maximize by
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
+ return this.maxBy(positionToMaxBy, true);
+ }
+
+ /**
+ * Applies an aggregation that gives the maximum element of every window of
+ * the data stream by the given position. If more elements have the same
+ * maximum value the operator returns either the first or last one depending
+ * on the parameter setting.
+ *
+ * @param positionToMaxBy The position to maximize by
+ * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
+ return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+ }
+
+ /**
+ * Applies an aggregation that that gives the maximum element of the pojo
+ * data stream by the given field expression for every window. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field The field expression based on which the aggregation will be applied.
+ * @param first If True then in case of field equality the first object will be returned
+ * @return The transformed DataStream.
+ */
+ public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
+ return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
+ }
+
+ private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
+ return reduce(aggregator);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+
+ private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
+ Function function,
+ TypeInformation<R> resultType,
+ String functionName) {
+
+ // TODO: add once non-parallel fast aligned time windows operator is ready
+ return null;
+ }
+
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return input.getExecutionEnvironment();
+ }
+
+ public TypeInformation<T> getInputType() {
+ return input.getType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
new file mode 100644
index 0000000..d1da783
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize co-group operation you also need to specify a {@link KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the groups are being built in memory so you need to ensure that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> two = ...;
+ *
+ * DataStream<T> result = one.coGroup(two)
+ * .where(new MyFirstKeySelector())
+ * .equalTo(new MyFirstKeySelector())
+ * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ * .apply(new MyCoGroupFunction());
+ * } </pre>
+ */
+public class CoGroupedStreams<T1, T2> {
+
+ /** The first input stream */
+ private final DataStream<T1> input1;
+
+ /** The second input stream */
+ private final DataStream<T2> input2;
+
+ /**
+ * Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
+ *
+ * @param input1 The first data stream.
+ * @param input2 The second data stream.
+ */
+ public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
+ this.input1 = requireNonNull(input1);
+ this.input2 = requireNonNull(input2);
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the first input.
+ */
+ public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
+ TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+ return new Where<>(input1.clean(keySelector), keyType);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * CoGrouped streams that have the key for one side defined.
+ *
+ * @param <KEY> The type of the key.
+ */
+ public class Where<KEY> {
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final TypeInformation<KEY> keyType;
+
+ Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
+ this.keySelector1 = keySelector1;
+ this.keyType = keyType;
+ }
+
+ /**
+ * Specifies a {@link KeySelector} for elements from the second input.
+ */
+ public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
+ TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+ if (!otherKey.equals(this.keyType)) {
+ throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
+ "first key = " + this.keyType + " , second key = " + otherKey);
+ }
+
+ return new EqualTo(input2.clean(keySelector));
+ }
+
+ // --------------------------------------------------------------------
+
+ /**
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
+ */
+ public class EqualTo {
+
+ private final KeySelector<T2, KEY> keySelector2;
+
+ EqualTo(KeySelector<T2, KEY> keySelector2) {
+ this.keySelector2 = requireNonNull(keySelector2);
+ }
+
+ /**
+ * Specifies the window on which the co-group operation works.
+ */
+ public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
+ * well as a {@link WindowAssigner}.
+ *
+ * @param <T1> Type of the elements from the first input
+ * @param <T2> Type of the elements from the second input
+ * @param <KEY> Type of the key. This must be the same for both inputs
+ * @param <W> Type of {@link Window} on which the co-group operation works.
+ */
+ public static class WithWindow<T1, T2, KEY, W extends Window> {
+ private final DataStream<T1> input1;
+ private final DataStream<T2> input2;
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final KeySelector<T2, KEY> keySelector2;
+
+ private final TypeInformation<KEY> keyType;
+
+ private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
+
+ private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
+
+ private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+
+ protected WithWindow(DataStream<T1> input1,
+ DataStream<T2> input2,
+ KeySelector<T1, KEY> keySelector1,
+ KeySelector<T2, KEY> keySelector2,
+ TypeInformation<KEY> keyType,
+ WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+ Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+ Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+ this.input1 = input1;
+ this.input2 = input2;
+
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ this.keyType = keyType;
+
+ this.windowAssigner = windowAssigner;
+ this.trigger = trigger;
+ this.evictor = evictor;
+ }
+
+ /**
+ * Sets the {@code Trigger} that should be used to trigger window emission.
+ */
+ public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, newTrigger, evictor);
+ }
+
+ /**
+ * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
+ *
+ * <p>
+ * Note: When using an evictor window performance will degrade significantly, since
+ * pre-aggregation of window results cannot be used.
+ */
+ public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, trigger, newEvictor);
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
+
+ TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
+ function,
+ CoGroupFunction.class,
+ true,
+ true,
+ input1.getType(),
+ input2.getType(),
+ "CoGroup",
+ false);
+
+ return apply(function, resultType);
+ }
+
+ /**
+ * Completes the co-group operation with the user function that is executed
+ * for windowed groups.
+ */
+ public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
+ //clean the closure
+ function = input1.getExecutionEnvironment().clean(function);
+
+ UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
+ UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
+
+ DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
+ .map(new Input1Tagger<T1, T2>())
+ .returns(unionType);
+ DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
+ .map(new Input2Tagger<T1, T2>())
+ .returns(unionType);
+
+ DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
+
+ // we explicitly create the keyed stream to manually pass the key type information in
+ WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp =
+ new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
+ .window(windowAssigner);
+
+ if (trigger != null) {
+ windowOp.trigger(trigger);
+ }
+ if (evictor != null) {
+ windowOp.evictor(evictor);
+ }
+
+ return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Data type and type information for Tagged Union
+ // ------------------------------------------------------------------------
+
+ /**
+ * Internal class for implementing tagged union co-group.
+ */
+ public static class TaggedUnion<T1, T2> {
+ private final T1 one;
+ private final T2 two;
+
+ private TaggedUnion(T1 one, T2 two) {
+ this.one = one;
+ this.two = two;
+ }
+
+ public boolean isOne() {
+ return one != null;
+ }
+
+ public boolean isTwo() {
+ return two != null;
+ }
+
+ public T1 getOne() {
+ return one;
+ }
+
+ public T2 getTwo() {
+ return two;
+ }
+
+ public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
+ return new TaggedUnion<>(one, null);
+ }
+
+ public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
+ return new TaggedUnion<>(null, two);
+ }
+ }
+
+ private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ TypeInformation<T1> oneType;
+ TypeInformation<T2> twoType;
+
+ public UnionTypeInfo(TypeInformation<T1> oneType,
+ TypeInformation<T2> twoType) {
+ this.oneType = oneType;
+ this.twoType = twoType;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 2;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 2;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked, rawtypes")
+ public Class<TaggedUnion<T1, T2>> getTypeClass() {
+ return (Class) TaggedUnion.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) {
+ return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config));
+ }
+
+ @Override
+ public String toString() {
+ return "TaggedUnion<" + oneType + ", " + twoType + ">";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnionTypeInfo) {
+ @SuppressWarnings("unchecked")
+ UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj;
+
+ return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * oneType.hashCode() + twoType.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof UnionTypeInfo;
+ }
+ }
+
+ private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<T1> oneSerializer;
+ private final TypeSerializer<T2> twoSerializer;
+
+ public UnionSerializer(TypeSerializer<T1> oneSerializer,
+ TypeSerializer<T2> twoSerializer) {
+ this.oneSerializer = oneSerializer;
+ this.twoSerializer = twoSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
+ return this;
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> createInstance() {
+ return null;
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
+ if (from.isOne()) {
+ return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+ } else {
+ return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+ }
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) {
+ if (from.isOne()) {
+ return TaggedUnion.one(oneSerializer.copy(from.getOne()));
+ } else {
+ return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+ } }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException {
+ if (record.isOne()) {
+ target.writeByte(1);
+ oneSerializer.serialize(record.getOne(), target);
+ } else {
+ target.writeByte(2);
+ twoSerializer.serialize(record.getTwo(), target);
+ }
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException {
+ byte tag = source.readByte();
+ if (tag == 1) {
+ return TaggedUnion.one(oneSerializer.deserialize(source));
+ } else {
+ return TaggedUnion.two(twoSerializer.deserialize(source));
+ }
+ }
+
+ @Override
+ public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse,
+ DataInputView source) throws IOException {
+ byte tag = source.readByte();
+ if (tag == 1) {
+ return TaggedUnion.one(oneSerializer.deserialize(source));
+ } else {
+ return TaggedUnion.two(twoSerializer.deserialize(source));
+ }
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ byte tag = source.readByte();
+ target.writeByte(tag);
+ if (tag == 1) {
+ oneSerializer.copy(source, target);
+ } else {
+ twoSerializer.copy(source, target);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * oneSerializer.hashCode() + twoSerializer.hashCode();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean equals(Object obj) {
+ if (obj instanceof UnionSerializer) {
+ UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj;
+
+ return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof UnionSerializer;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utility functions that implement the CoGroup logic based on the tagged
+ // untion window reduce
+ // ------------------------------------------------------------------------
+
+ private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TaggedUnion<T1, T2> map(T1 value) throws Exception {
+ return TaggedUnion.one(value);
+ }
+ }
+
+ private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TaggedUnion<T1, T2> map(T2 value) throws Exception {
+ return TaggedUnion.two(value);
+ }
+ }
+
+ private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
+ private static final long serialVersionUID = 1L;
+
+ private final KeySelector<T1, KEY> keySelector1;
+ private final KeySelector<T2, KEY> keySelector2;
+
+ public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
+ KeySelector<T2, KEY> keySelector2) {
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ }
+
+ @Override
+ public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
+ if (value.isOne()) {
+ return keySelector1.getKey(value.getOne());
+ } else {
+ return keySelector2.getKey(value.getTwo());
+ }
+ }
+ }
+
+ private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
+ extends WrappingFunction<CoGroupFunction<T1, T2, T>>
+ implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void apply(KEY key,
+ W window,
+ Iterable<TaggedUnion<T1, T2>> values,
+ Collector<T> out) throws Exception {
+
+ List<T1> oneValues = new ArrayList<>();
+ List<T2> twoValues = new ArrayList<>();
+
+ for (TaggedUnion<T1, T2> val: values) {
+ if (val.isOne()) {
+ oneValues.add(val.getOne());
+ } else {
+ twoValues.add(val.getTwo());
+ }
+ }
+ wrappedFunction.coGroup(oneValues, twoValues, out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
new file mode 100644
index 0000000..4074a1d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+
+/**
+ * {@code ConnectedStreams} represents two connected streams of (possible) different data types. It
+ * can be used to apply transformations such as {@link CoMapFunction} on two
+ * {@link DataStream DataStreams}
+ *
+ * @param <IN1> Type of the first input data steam.
+ * @param <IN2> Type of the second input data stream.
+ */
+public class ConnectedStreams<IN1, IN2> {
+
+ protected StreamExecutionEnvironment environment;
+ protected DataStream<IN1> inputStream1;
+ protected DataStream<IN2> inputStream2;
+
+ protected ConnectedStreams(StreamExecutionEnvironment env,
+ DataStream<IN1> input1,
+ DataStream<IN2> input2) {
+ this.environment = env;
+ if (input1 != null) {
+ this.inputStream1 = input1;
+ }
+ if (input2 != null) {
+ this.inputStream2 = input2;
+ }
+ }
+
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return environment;
+ }
+
+ /**
+ * Returns the first {@link DataStream}.
+ *
+ * @return The first DataStream.
+ */
+ public DataStream<IN1> getFirstInput() {
+ return inputStream1;
+ }
+
+ /**
+ * Returns the second {@link DataStream}.
+ *
+ * @return The second DataStream.
+ */
+ public DataStream<IN2> getSecondInput() {
+ return inputStream2;
+ }
+
+ /**
+ * Gets the type of the first input
+ *
+ * @return The type of the first input
+ */
+ public TypeInformation<IN1> getType1() {
+ return inputStream1.getType();
+ }
+
+ /**
+ * Gets the type of the second input
+ *
+ * @return The type of the second input
+ */
+ public TypeInformation<IN2> getType2() {
+ return inputStream2.getType();
+ }
+
+ /**
+ * KeyBy operation for connected data stream. Assigns keys to the elements of
+ * input1 and input2 according to keyPosition1 and keyPosition2.
+ *
+ * @param keyPosition1
+ * The field used to compute the hashcode of the elements in the
+ * first input stream.
+ * @param keyPosition2
+ * The field used to compute the hashcode of the elements in the
+ * second input stream.
+ * @return The grouped {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
+ return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
+ inputStream2.keyBy(keyPosition2));
+ }
+
+ /**
+ * KeyBy operation for connected data stream. Assigns keys to the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2.
+ *
+ * @param keyPositions1
+ * The fields used to group the first input stream.
+ * @param keyPositions2
+ * The fields used to group the second input stream.
+ * @return The grouped {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
+ inputStream2.keyBy(keyPositions2));
+ }
+
+ /**
+ * KeyBy operation for connected data stream using key expressions. Assigns keys to
+ * the elements of input1 and input2 according to field1 and field2. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field1
+ * The grouping expression for the first input
+ * @param field2
+ * The grouping expression for the second input
+ * @return The grouped {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
+ inputStream2.keyBy(field2));
+ }
+
+ /**
+ * KeyBy operation for connected data stream using key expressions.
+ * the elements of input1 and input2 according to fields1 and fields2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}S underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ * .
+ *
+ * @param fields1
+ * The grouping expressions for the first input
+ * @param fields2
+ * The grouping expressions for the second input
+ * @return The grouped {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
+ inputStream2.keyBy(fields2));
+ }
+
+ /**
+ * KeyBy operation for connected data stream. Assigns keys to the elements of
+ * input1 and input2 using keySelector1 and keySelector2.
+ *
+ * @param keySelector1
+ * The {@link KeySelector} used for grouping the first input
+ * @param keySelector2
+ * The {@link KeySelector} used for grouping the second input
+ * @return The partitioned {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+ return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
+ inputStream2.keyBy(keySelector2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPosition1 and keyPosition2.
+ *
+ * @param keyPosition1
+ * The field used to compute the hashcode of the elements in the
+ * first input stream.
+ * @param keyPosition2
+ * The field used to compute the hashcode of the elements in the
+ * second input stream.
+ * @return The partitioned {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
+ return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPosition1),
+ inputStream2.partitionByHash(keyPosition2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2.
+ *
+ * @param keyPositions1
+ * The fields used to group the first input stream.
+ * @param keyPositions2
+ * The fields used to group the second input stream.
+ * @return The partitioned {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
+ return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPositions1),
+ inputStream2.partitionByHash(keyPositions2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to field1 and field2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}s underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ *
+ * @param field1
+ * The partitioning expressions for the first input
+ * @param field2
+ * The partitioning expressions for the second input
+ * @return The partitioned {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> partitionByHash(String field1, String field2) {
+ return new ConnectedStreams<>(environment, inputStream1.partitionByHash(field1),
+ inputStream2.partitionByHash(field2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to fields1 and fields2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}s underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ *
+ * @param fields1
+ * The partitioning expressions for the first input
+ * @param fields2
+ * The partitioning expressions for the second input
+ * @return The partitioned {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
+ return new ConnectedStreams<>(environment, inputStream1.partitionByHash(fields1),
+ inputStream2.partitionByHash(fields2));
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 using keySelector1 and keySelector2.
+ *
+ * @param keySelector1
+ * The {@link KeySelector} used for partitioning the first input
+ * @param keySelector2
+ * The {@link KeySelector} used for partitioning the second input
+ * @return @return The partitioned {@link ConnectedStreams}
+ */
+ public ConnectedStreams<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+ return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keySelector1),
+ inputStream2.partitionByHash(keySelector2));
+ }
+
+ /**
+ * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+ * the output to a common type. The transformation calls a
+ * {@link CoMapFunction#map1} for each element of the first input and
+ * {@link CoMapFunction#map2} for each element of the second input. Each
+ * CoMapFunction call returns exactly one element.
+ *
+ * @param coMapper The CoMapFunction used to jointly transform the two input DataStreams
+ * @return The transformed {@link DataStream}
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
+
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
+ CoMapFunction.class, false, true, getType1(), getType2(),
+ Utils.getCallLocationName(), true);
+
+ return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
+
+ }
+
+ /**
+ * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+ * maps the output to a common type. The transformation calls a
+ * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+ * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+ * input. Each CoFlatMapFunction call returns any number of elements
+ * including none.
+ *
+ * @param coFlatMapper
+ * The CoFlatMapFunction used to jointly transform the two input
+ * DataStreams
+ * @return The transformed {@link DataStream}
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
+ CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
+
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+ CoFlatMapFunction.class, false, true, getType1(), getType2(),
+ Utils.getCallLocationName(), true);
+
+ return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
+ }
+
+ public <OUT> SingleOutputStreamOperator<OUT, ?> transform(String functionName,
+ TypeInformation<OUT> outTypeInfo,
+ TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+
+ // read the output type of the input Transforms to coax out errors about MissingTypeInfo
+ inputStream1.getType();
+ inputStream2.getType();
+
+ TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
+ inputStream1.getTransformation(),
+ inputStream2.getTransformation(),
+ functionName,
+ operator,
+ outTypeInfo,
+ environment.getParallelism());
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(environment, transform);
+
+ getExecutionEnvironment().addOperator(transform);
+
+ return returnStream;
+ }
+}