You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:15:04 UTC
[08/12] flink git commit: [FLINK-2753] [streaming] [api breaking] Add
first parts of new window API for key grouped windows
[FLINK-2753] [streaming] [api breaking] Add first parts of new window API for key grouped windows
This follows the API design outlined in https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
This is API breaking because it adds new generic type parameters to Java and Scala classes, breaking binary compatibility.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e20299c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e20299c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e20299c
Branch: refs/heads/master
Commit: 7e20299c4e2d9cc78c36f90bdf0acdbaf72062b0
Parents: 501a9b0
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 23 12:05:54 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 28 17:04:16 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/TimeCharacteristic.java | 81 +++++++++++
.../api/datastream/ConnectedDataStream.java | 4 +-
.../streaming/api/datastream/DataStream.java | 26 ++--
.../api/datastream/GroupedDataStream.java | 8 +-
.../api/datastream/KeyedDataStream.java | 51 ++++++-
.../api/datastream/KeyedWindowDataStream.java | 135 +++++++++++++++++++
.../api/datastream/WindowedDataStream.java | 4 +-
.../environment/StreamExecutionEnvironment.java | 47 ++++++-
.../functions/windows/KeyedWindowFunction.java | 6 +-
.../windowpolicy/AbstractTimePolicy.java | 109 +++++++++++++++
.../api/windowing/windowpolicy/EventTime.java | 64 +++++++++
.../windowing/windowpolicy/ProcessingTime.java | 65 +++++++++
.../api/windowing/windowpolicy/Time.java | 68 ++++++++++
.../windowing/windowpolicy/WindowPolicy.java | 57 ++++++++
.../windows/AccumulatingKeyedTimePanes.java | 8 +-
...ccumulatingProcessingTimeWindowOperator.java | 4 +-
.../operators/windows/PolicyToOperator.java | 82 +++++++++++
.../streaming/util/keys/KeySelectorUtil.java | 17 ++-
.../api/state/StatefulOperatorTest.java | 8 +-
.../GroupedProcessingTimeWindowExample.java | 79 +++--------
.../flink/streaming/api/scala/DataStream.scala | 17 +--
.../streaming/api/scala/GroupedDataStream.scala | 3 +-
.../api/scala/StreamExecutionEnvironment.scala | 25 +++-
.../flink/streaming/api/scala/package.scala | 4 +-
24 files changed, 848 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
new file mode 100644
index 0000000..1ad3c99
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The time characteristic defines how the system determines time for time-dependent
+ * order and operations that depend on time (such as time windows).
+ */
+public enum TimeCharacteristic {
+
+ /**
+ * Processing time for operators means that the operator uses the system clock of the machine
+ * to determine the current time of the data stream. Processing-time windows trigger based
+ * on wall-clock time and include whatever elements happen to have arrived at the operator at
+ * that point in time.
+ * <p>
+ * Using processing time for window operations results in general in quite non-deterministic results,
+ * because the contents of the windows depends on the speed in which elements arrive. It is, however,
+ * the cheapest method of forming windows and the method that introduces the least latency.
+ */
+ ProcessingTime,
+
+ /**
+ * Ingestion time means that the time of each individual element in the stream is determined
+ * when the element enters the Flink streaming data flow. Operations like windows group the
+ * elements based on that time, meaning that processing speed within the streaming dataflow
+ * does not affect windowing, but only the speed at which sources receive elements.
+ * <p>
+ * Ingestion time is often a good compromise between more processing time and event time.
+ * It does not need and special manual form of watermark generation, and events are typically
+ * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
+ * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
+ * are not very much out-of-order means that the latency increase is moderate, compared to event
+ * time.
+ */
+ IngestionTime,
+
+ /**
+ * Event time means that the time of each individual element in the stream (also called event)
+ * is determined by the event's individual custom timestamp. These timestamps either exist in the
+ * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
+ * The big implication of this is that elements arrive in the sources and in all operators generally
+ * out of order, meaning that elements with earlier timestamps may arrive after elements with
+ * later timestamps.
+ * <p>
+ * Operators that window or order data with respect to event time must buffer data until they can
+ * be sure that all timestamps for a certain time interval have been received. This is handled by
+ * the so called "time watermarks".
+ * <p>
+ * Operations based on event time are very predictable - the result of windowing operations
+ * is typically identical no matter when the window is executed and how fast the streams operate.
+ * At the same time, the buffering and tracking of event time is also costlier than operating
+ * with processing time, and typically also introduces more latency. The amount of extra
+ * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
+ * between the arrival of early and late elements is. With respect to the "time watermarks", this
+ * means that teh cost typically depends on how early or late the watermarks for can be generated
+ * for their timestamp.
+ * <p>
+ * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
+ * original time, rather than the time assigned at the data source. Practically, that means that
+ * event time has generally more meaning, but also that it takes longer to determine that all
+ * elements for a certain time have arrived.
+ */
+ EventTime
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 8609a30..0406e35 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -67,8 +67,8 @@ public class ConnectedDataStream<IN1, IN2> {
if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
this.isGrouped = true;
- this.keySelector1 = ((GroupedDataStream<IN1>) input1).keySelector;
- this.keySelector2 = ((GroupedDataStream<IN2>) input2).keySelector;
+ this.keySelector1 = ((GroupedDataStream<IN1, ?>) input1).keySelector;
+ this.keySelector2 = ((GroupedDataStream<IN2, ?>) input2).keySelector;
} else {
this.isGrouped = false;
this.keySelector1 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d92498c..5dfb1e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -229,8 +229,8 @@ public class DataStream<T> {
* The KeySelector to be used for extracting the key for partitioning
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
*/
- public KeyedDataStream<T> keyBy(KeySelector<T,?> key){
- return new KeyedDataStream<T>(this, clean(key));
+ public <K> KeyedDataStream<T, K> keyBy(KeySelector<T, K> key){
+ return new KeyedDataStream<T, K>(this, clean(key));
}
/**
@@ -241,7 +241,7 @@ public class DataStream<T> {
* will be grouped.
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
*/
- public KeyedDataStream<T> keyBy(int... fields) {
+ public KeyedDataStream<T, Tuple> keyBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
@@ -260,12 +260,12 @@ public class DataStream<T> {
* partitioned.
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
**/
- public KeyedDataStream<T> keyBy(String... fields) {
+ public KeyedDataStream<T, Tuple> keyBy(String... fields) {
return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
- private KeyedDataStream<T> keyBy(Keys<T> keys) {
- return new KeyedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
+ private KeyedDataStream<T, Tuple> keyBy(Keys<T> keys) {
+ return new KeyedDataStream<T, Tuple>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
@@ -279,7 +279,7 @@ public class DataStream<T> {
* will be partitioned.
* @return The {@link DataStream} with partitioned state (i.e. KeyedDataStream)
*/
- public GroupedDataStream<T> groupBy(int... fields) {
+ public GroupedDataStream<T, Tuple> groupBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return groupBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
} else {
@@ -304,7 +304,7 @@ public class DataStream<T> {
* grouped.
* @return The grouped {@link DataStream}
**/
- public GroupedDataStream<T> groupBy(String... fields) {
+ public GroupedDataStream<T, Tuple> groupBy(String... fields) {
return groupBy(new Keys.ExpressionKeys<T>(fields, getType()));
}
@@ -322,13 +322,13 @@ public class DataStream<T> {
* the values
* @return The grouped {@link DataStream}
*/
- public GroupedDataStream<T> groupBy(KeySelector<T, ?> keySelector) {
- return new GroupedDataStream<T>(this, clean(keySelector));
+ public <K> GroupedDataStream<T, K> groupBy(KeySelector<T, K> keySelector) {
+ return new GroupedDataStream<T, K>(this, clean(keySelector));
}
- private GroupedDataStream<T> groupBy(Keys<T> keys) {
- return new GroupedDataStream<T>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
- getType(), getExecutionConfig())));
+ private GroupedDataStream<T, Tuple> groupBy(Keys<T> keys) {
+ return new GroupedDataStream<T, Tuple>(this,
+ clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a1106bc..50bf341 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
* @param <OUT>
* The output type of the {@link GroupedDataStream}.
*/
-public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
+public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
/**
* Creates a new {@link GroupedDataStream}, group inclusion is determined using
@@ -48,7 +48,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
* @param dataStream Base stream of data
* @param keySelector Function for determining group inclusion
*/
- public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, ?> keySelector) {
+ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) {
super(dataStream, keySelector);
}
@@ -324,8 +324,6 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector);
- SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation",
- getType(), operator);
- return returnStream;
+ return transform("Grouped Aggregation", getType(), operator);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
index 100e5de..a32cf53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedDataStream.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -32,11 +33,12 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
* are also possible on a KeyedDataStream, with the exception of partitioning methods such as shuffle, forward and groupBy.
*
*
- * @param <T> The type of the elements in the Keyed Stream
+ * @param <T> The type of the elements in the Keyed Stream.
+ * @param <K> The type of the key in the Keyed Stream.
*/
-public class KeyedDataStream<T> extends DataStream<T> {
+public class KeyedDataStream<T, K> extends DataStream<T> {
- protected final KeySelector<T, ?> keySelector;
+ protected final KeySelector<T, K> keySelector;
/**
* Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
@@ -47,35 +49,70 @@ public class KeyedDataStream<T> extends DataStream<T> {
* @param keySelector
* Function for determining state partitions
*/
- public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, ?> keySelector) {
+ public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
this.keySelector = keySelector;
}
- public KeySelector<T, ?> getKeySelector() {
+
+ public KeySelector<T, K> getKeySelector() {
return this.keySelector;
}
+
@Override
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedDataStream.");
}
+
@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
- ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(
- keySelector);
+ ((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
return returnStream;
}
+
+
@Override
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
DataStreamSink<T> result = super.addSink(sinkFunction);
result.getTransformation().setStateKeySelector(keySelector);
return result;
}
+
+ // ------------------------------------------------------------------------
+ // Windowing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
+ * grouped stream. The window is defined by a single policy.
+ * <p>
+ * For time windows, these single-policy windows result in tumbling time windows.
+ *
+ * @param policy The policy that defines the window.
+ * @return The windows data stream.
+ */
+ public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
+ return new KeyedWindowDataStream<T, K>(this, policy);
+ }
+
+ /**
+ * Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key
+ * grouped stream. The window is defined by a window policy, plus a slide policy.
+ * <p>
+ * For time windows, these slide policy windows result in sliding time windows.
+ *
+ * @param window The policy that defines the window.
+ * @param slide The additional policy defining the slide of the window.
+ * @return The windows data stream.
+ */
+ public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) {
+ return new KeyedWindowDataStream<T, K>(this, window, slide);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
new file mode 100644
index 0000000..2ec175a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+import org.apache.flink.streaming.runtime.operators.windows.PolicyToOperator;
+
+/**
+ * A KeyedWindowDataStream represents a data stream where elements are grouped by key, and
+ * for each key, the stream of elements is split into windows. The windows are conceptually
+ * evaluated for each key individually, meaning windows and trigger at different points
+ * for each key.
+ * <p>
+ * In many cases, however, the windows are "aligned", meaning they trigger at the
+ * same time for all keys. The most common example for that are the regular time windows.
+ * <p>
+ * Note that the KeyedWindowDataStream is purely and API construct, during runtime the
+ * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
+ * over the window into one single operation.
+ *
+ * @param <Type> The type of elements in the stream.
+ * @param <Key> The type of the key by which elements are grouped.
+ */
+public class KeyedWindowDataStream<Type, Key> {
+
+ /** The keyed data stream that is windowed by this stream */
+ private final KeyedDataStream<Type, Key> input;
+
+ /** The core window policy */
+ private final WindowPolicy windowPolicy;
+
+ /** The optional additional slide policy */
+ private final WindowPolicy slidePolicy;
+
+
+ public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) {
+ this(input, windowPolicy, null);
+ }
+
+ public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
+ WindowPolicy windowPolicy, WindowPolicy slidePolicy)
+ {
+ TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ this.input = input;
+ this.windowPolicy = windowPolicy.makeSpecificBasedOnTimeCharacteristic(time);
+ this.slidePolicy = slidePolicy == null ? null : slidePolicy.makeSpecificBasedOnTimeCharacteristic(time);
+ }
+
+ // ------------------------------------------------------------------------
+ // Operations on the keyed windows
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies a reduce function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the reduce function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * This window will try and pre-aggregate data as much as the window policies permit. For example,
+ * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
+ * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
+ * so a few elements are stored per key (one per slide interval).
+ * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
+ * aggregation tree.
+ *
+ * @param function The reduce function.
+ * @return The data stream that is the result of applying the reduce function to the window.
+ */
+ public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
+ String callLocation = Utils.getCallLocationName();
+ return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
+ }
+
+ /**
+ * Applies a window function to the window. The window function is called for each evaluation
+ * of the window for each key individually. The output of the window function is interpreted
+ * as a regular non-windowed stream.
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means od pre-aggregation.
+ *
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) {
+ String callLocation = Utils.getCallLocationName();
+
+ TypeInformation<Type> inType = input.getType();
+ TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, KeyedWindowFunction.class, true, true, inType, null, false);
+
+ return createWindowOperator(function, resultType, "KeyedWindowFunction at " + callLocation);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private <Result> DataStream<Result> createWindowOperator(
+ Function function, TypeInformation<Result> resultType, String functionName) {
+
+ String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
+ KeySelector<Type, Key> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<Type, Result> operator =
+ PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);
+
+ return input.transform(opName, resultType, operator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index bf3a11a..1226adf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -103,7 +103,7 @@ public class WindowedDataStream<OUT> {
this.triggerHelper = policyHelper;
if (dataStream instanceof GroupedDataStream) {
- this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
+ this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
}
}
@@ -115,7 +115,7 @@ public class WindowedDataStream<OUT> {
this.userEvicter = evicter;
if (dataStream instanceof GroupedDataStream) {
- this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
+ this.discretizerKey = ((GroupedDataStream<OUT, ?>) dataStream).keySelector;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index d91afc9..a22a519 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.environment;
import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
@@ -49,6 +49,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
@@ -72,10 +73,12 @@ import org.apache.flink.util.SplittableIterator;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
/**
* {@link org.apache.flink.api.java.ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -83,25 +86,33 @@ import java.util.List;
*/
public abstract class StreamExecutionEnvironment {
- public final static String DEFAULT_JOB_NAME = "Flink Streaming Job";
+ public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
-
+
+ /** The time characteristic that is used if none other is set */
+ private static TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
+
+ // ------------------------------------------------------------------------
+
private long bufferTimeout = 100;
- private ExecutionConfig config = new ExecutionConfig();
+ private final ExecutionConfig config = new ExecutionConfig();
- protected List<StreamTransformation<?>> transformations = Lists.newArrayList();
+ protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
protected boolean isChainingEnabled = true;
protected long checkpointInterval = -1; // disabled
- protected CheckpointingMode checkpointingMode = null;
+ protected CheckpointingMode checkpointingMode;
protected boolean forceCheckpointing = false;
protected StateHandleProvider<?> stateHandleProvider;
+
+ /** The time characteristic used by the data streams */
+ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
/** The environment of the context (local by default, cluster if invoked through command line) */
private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
@@ -516,6 +527,30 @@ public abstract class StreamExecutionEnvironment {
}
// --------------------------------------------------------------------------------------------
+ // Time characteristic
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the time characteristic for the stream, e.g., processing time, event time,
+ * or ingestion time.
+ *
+ * @param characteristic The time characteristic.
+ */
+ public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
+ this.timeCharacteristic = Objects.requireNonNull(characteristic);
+ }
+
+ /**
+ * Gets the time characteristic for the stream, e.g., processing time, event time,
+ * or ingestion time.
+ *
+ * @return The time characteristic.
+ */
+ public TimeCharacteristic getStreamTimeCharacteristic() {
+ return timeCharacteristic;
+ }
+
+ // --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
index d7ca0a1..b4e55e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windows/KeyedWindowFunction.java
@@ -25,12 +25,12 @@ import java.io.Serializable;
/**
* Base interface for functions that are evaluated over keyed (grouped) windows.
- *
- * @param <KEY> The type of the key.
+ *
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
*/
-public interface KeyedWindowFunction<KEY, IN, OUT> extends Function, Serializable {
+public interface KeyedWindowFunction<IN, OUT, KEY> extends Function, Serializable {
/**
*
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
new file mode 100644
index 0000000..9dc0dd0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/AbstractTimePolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class AbstractTimePolicy extends WindowPolicy {
+
+ private static final long serialVersionUID = 6593098375698927728L;
+
+ /** the time unit for this policy's time interval */
+ private final TimeUnit unit;
+
+ /** the length of this policy's time interval */
+ private final long num;
+
+
+ protected AbstractTimePolicy(long num, TimeUnit unit) {
+ this.unit = checkNotNull(unit, "time unit may not be null");
+ this.num = num;
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the time unit for this policy's time interval.
+ * @return The time unit for this policy's time interval.
+ */
+ public TimeUnit getUnit() {
+ return unit;
+ }
+
+ /**
+ * Gets the length of this policy's time interval.
+ * @return The length of this policy's time interval.
+ */
+ public long getNum() {
+ return num;
+ }
+
+ /**
+ * Converts the time interval to milliseconds.
+ * @return The time interval in milliseconds.
+ */
+ public long toMilliseconds() {
+ return unit.toMillis(num);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString(WindowPolicy slidePolicy) {
+ if (slidePolicy == null) {
+ return "Tumbling Window (" + getClass().getSimpleName() + ") (" + num + ' ' + unit.name() + ')';
+ }
+ else if (slidePolicy.getClass() == getClass()) {
+ AbstractTimePolicy timeSlide = (AbstractTimePolicy) slidePolicy;
+
+ return "Sliding Window (" + getClass().getSimpleName() + ") (length="
+ + num + ' ' + unit.name() + ", slide=" + timeSlide.num + ' ' + timeSlide.unit.name() + ')';
+ }
+ else {
+ return super.toString(slidePolicy);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * (int) (num ^ (num >>> 32)) + unit.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj.getClass() == getClass()) {
+ AbstractTimePolicy that = (AbstractTimePolicy) obj;
+ return this.num == that.num && this.unit.equals(that.unit);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " (" + num + ' ' + unit.name() + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
new file mode 100644
index 0000000..8a671fc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/EventTime.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of an event time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
+ * of event time.
+ */
+public final class EventTime extends AbstractTimePolicy {
+
+ private static final long serialVersionUID = 8333566691833596747L;
+
+ /** Instantiation only via factory method */
+ private EventTime(long num, TimeUnit unit) {
+ super(num, unit);
+ }
+
+ @Override
+ public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+ if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
+ return this;
+ }
+ else {
+ throw new InvalidProgramException(
+ "Cannot use EventTime policy in a dataflow that runs on " + characteristic);
+ }
+ }
+ // ------------------------------------------------------------------------
+ // Factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates an event time policy describing an event time interval.
+ *
+ * @param num The length of the time interval.
+ * @param unit The init (seconds, milliseconds) of the time interval.
+ * @return The event time policy.
+ */
+ public static EventTime of(long num, TimeUnit unit) {
+ return new EventTime(num, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
new file mode 100644
index 0000000..2ff13fa
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/ProcessingTime.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a processing time interval for windowing. See
+ * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
+ * of processing time.
+ */
+public final class ProcessingTime extends AbstractTimePolicy {
+
+ private static final long serialVersionUID = 7546166721132583007L;
+
+ /** Instantiation only via factory method */
+ private ProcessingTime(long num, TimeUnit unit) {
+ super(num, unit);
+ }
+
+ @Override
+ public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+ if (characteristic == TimeCharacteristic.ProcessingTime) {
+ return this;
+ }
+ else {
+ throw new InvalidProgramException(
+ "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a processing time policy describing a processing time interval.
+ *
+ * @param num The length of the time interval.
+ * @param unit The init (seconds, milliseconds) of the time interval.
+ * @return The processing time policy.
+ */
+ public static ProcessingTime of(long num, TimeUnit unit) {
+ return new ProcessingTime(num, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
new file mode 100644
index 0000000..0233e96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/Time.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The definition of a time interval for windowing. The time characteristic referred
+ * to is the default time characteristic set on the execution environment.
+ */
+public final class Time extends AbstractTimePolicy {
+
+ private static final long serialVersionUID = 3197290738634320211L;
+
+ /** Instantiation only via factory method */
+ private Time(long num, TimeUnit unit) {
+ super(num, unit);
+ }
+
+ @Override
+ public AbstractTimePolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+ switch (timeCharacteristic) {
+ case ProcessingTime:
+ return ProcessingTime.of(getNum(), getUnit());
+ case IngestionTime:
+ case EventTime:
+ return EventTime.of(getNum(), getUnit());
+ default:
+ throw new IllegalArgumentException("Unknown time characteristic");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a time policy describing a processing time interval. The policy refers to the
+ * time characteristic that is set on the dataflow via
+ * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
+ * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
+ *
+ * @param num The length of the time interval.
+ * @param unit The init (seconds, milliseconds) of the time interval.
+ * @return The time policy.
+ */
+ public static Time of(long num, TimeUnit unit) {
+ return new Time(num, unit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
new file mode 100644
index 0000000..a82f892
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowpolicy/WindowPolicy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowpolicy;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+
+/**
+ * The base class of all window policies. Window policies define how windows
+ * are formed over the data stream.
+ */
+public abstract class WindowPolicy implements java.io.Serializable {
+
+ private static final long serialVersionUID = -8696529489282723113L;
+
+ /**
+ * If this window policies concrete instantiation depends on the time characteristic of the
+ * dataflow (processing time, event time), then this method must be overridden to convert this
+ * policy to the respective specific instantiation.
+ * <p>
+ * The {@link Time} policy for example, will convert itself to an {@link ProcessingTime} policy,
+ * if the time characteristic is set to {@link TimeCharacteristic#ProcessingTime}.
+ * <p>
+ * By default, this method does nothing and simply returns this object itself.
+ *
+ * @param characteristic The time characteristic of the dataflow.
+ * @return The specific instantiation of this policy, or the policy itself.
+ */
+ public WindowPolicy makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
+ return this;
+ }
+
+
+ public String toString(WindowPolicy slidePolicy) {
+ if (slidePolicy != null) {
+ return "Window [" + toString() + ", slide=" + slidePolicy + ']';
+ }
+ else {
+ return "Window [" + toString() + ']';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
index e776106..1212123 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingKeyedTimePanes.java
@@ -32,13 +32,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
- private final KeyedWindowFunction<Key, Type, Result> function;
+ private final KeyedWindowFunction<Type, Result, Key> function;
private long evaluationPass;
// ------------------------------------------------------------------------
- public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Key, Type, Result> function) {
+ public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key> function) {
this.keySelector = keySelector;
this.function = function;
}
@@ -75,7 +75,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
- private final KeyedWindowFunction<Key, Type, Result> function;
+ private final KeyedWindowFunction<Type, Result, Key> function;
private final UnionIterator<Type> unionIterator;
@@ -83,7 +83,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private Key currentKey;
- WindowFunctionTraversal(KeyedWindowFunction<Key, Type, Result> function, Collector<Result> out) {
+ WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key> function, Collector<Result> out) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
index 16444c2..fb9d163 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingProcessingTimeWindowOperator.java
@@ -30,7 +30,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
public AccumulatingProcessingTimeWindowOperator(
- KeyedWindowFunction<KEY, IN, OUT> function,
+ KeyedWindowFunction<IN, OUT, KEY> function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
@@ -41,7 +41,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
@SuppressWarnings("unchecked")
- KeyedWindowFunction<KEY, IN, OUT> windowFunction = (KeyedWindowFunction<KEY, IN, OUT>) function;
+ KeyedWindowFunction<IN, OUT, KEY> windowFunction = (KeyedWindowFunction<IN, OUT, KEY>) function;
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
new file mode 100644
index 0000000..9d06ef5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windows/PolicyToOperator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windows;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime;
+import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy;
+
+/**
+ * This class implements the conversion from window policies to concrete operator
+ * implementations.
+ */
+public class PolicyToOperator {
+
+ /**
+ * Entry point to create an operator for the given window policies and the window function.
+ */
+ public static <IN, OUT, KEY> OneInputStreamOperator<IN, OUT> createOperatorForPolicies(
+ WindowPolicy window, WindowPolicy slide, Function function, KeySelector<IN, KEY> keySelector)
+ {
+ if (window == null || function == null) {
+ throw new NullPointerException();
+ }
+
+ // -- case 1: both policies are processing time policies
+ if (window instanceof ProcessingTime && (slide == null || slide instanceof ProcessingTime)) {
+ final long windowLength = ((ProcessingTime) window).toMilliseconds();
+ final long windowSlide = slide == null ? windowLength : ((ProcessingTime) slide).toMilliseconds();
+
+ if (function instanceof ReduceFunction) {
+ @SuppressWarnings("unchecked")
+ ReduceFunction<IN> reducer = (ReduceFunction<IN>) function;
+
+ @SuppressWarnings("unchecked")
+ OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>)
+ new AggregatingProcessingTimeWindowOperator<KEY, IN>(
+ reducer, keySelector, windowLength, windowSlide);
+ return op;
+ }
+ else if (function instanceof KeyedWindowFunction) {
+ @SuppressWarnings("unchecked")
+ KeyedWindowFunction<IN, OUT, KEY> wf = (KeyedWindowFunction<IN, OUT, KEY>) function;
+
+ return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>(
+ wf, keySelector, windowLength, windowSlide);
+ }
+ }
+
+ // -- case 2: both policies are event time policies
+ if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) {
+ // add event time implementation
+ }
+
+ throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide));
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Don't instantiate */
+ private PolicyToOperator() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 2e0fe66..f758147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple;
public class KeySelectorUtil {
- public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
+ public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
if (!(typeInfo instanceof CompositeType)) {
throw new InvalidTypesException(
"This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
@@ -93,9 +93,15 @@ public class KeySelectorUtil {
comparator.extractKeys(value, keyArray, 0);
return (K) keyArray[0];
}
-
}
+ // ------------------------------------------------------------------------
+
+ /**
+ * A key selector for selecting key fields via a TypeComparator.
+ *
+ * @param <IN> The type from which the key is extracted.
+ */
public static class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
private static final long serialVersionUID = 1L;
@@ -126,6 +132,13 @@ public class KeySelectorUtil {
}
+ // ------------------------------------------------------------------------
+
+ /**
+ * A key selector for selecting individual array fields as keys and returns them as a Tuple.
+ *
+ * @param <IN> The type from which the key is extracted, i.e., the array type.
+ */
public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index b8b4c13..207b1b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -113,7 +113,9 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
- KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
+ KeyedDataStream<Integer, Integer> keyedStream = env
+ .fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
+ .keyBy(new ModKey(4));
keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
private static final long serialVersionUID = 1L;
@@ -163,7 +165,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
@SuppressWarnings("unchecked")
private StreamMap<Integer, String> createOperatorWithContext(List<String> output,
- KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception {
+ KeySelector<Integer, ? extends Serializable> partitioner, byte[] serializedState) throws Exception {
final List<String> outputList = output;
StreamingRuntimeContext context = new StreamingRuntimeContext(
@@ -355,7 +357,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
}
- public static class ModKey implements KeySelector<Integer, Serializable> {
+ public static class ModKey implements KeySelector<Integer, Integer> {
private static final long serialVersionUID = 4193026742083046736L;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 7387a1e..e52c2cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -22,15 +22,16 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.streaming.runtime.operators.windows.AggregatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.api.windowing.windowpolicy.Time;
import org.apache.flink.util.Collector;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
@SuppressWarnings("serial")
public class GroupedProcessingTimeWindowExample {
@@ -75,31 +76,20 @@ public class GroupedProcessingTimeWindowExample {
});
stream
- .groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
-// .window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS))
-// .reduceWindow(new SummingReducer())
-// .flatten()
-// .partitionByHash(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
-// .transform(
-// "Aligned time window",
-// TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
-// new AccumulatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>, Tuple2<Long, Long>>(
-// new SummingWindowFunction<Long>(),
-// new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
-// 2500, 500))
- .transform(
- "Aligned time window",
- TypeInfoParser.<Tuple2<Long, Long>>parse("Tuple2<Long, Long>"),
- new AggregatingProcessingTimeWindowOperator<Long, Tuple2<Long, Long>>(
- new SummingReducer(),
- new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>(),
- 2500, 500))
+ .keyBy(0)
+ .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+ .reduceWindow(new SummingReducer())
+
+ // alternative: use a mapWindow function which does not pre-aggregate
+// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
+// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+// .mapWindow(new SummingWindowFunction())
.addSink(new SinkFunction<Tuple2<Long, Long>>() {
- @Override
- public void invoke(Tuple2<Long, Long> value) {
- }
- });
+ @Override
+ public void invoke(Tuple2<Long, Long> value) {
+ }
+ });
env.execute();
}
@@ -113,47 +103,16 @@ public class GroupedProcessingTimeWindowExample {
}
}
- public static class IdentityKeyExtractor<T> implements KeySelector<T, T> {
-
- @Override
- public T getKey(T value) {
- return value;
- }
- }
-
- public static class IdentityWindowFunction<K, T> implements KeyedWindowFunction<K, T, T> {
-
- @Override
- public void evaluate(K k, Iterable<T> values, Collector<T> out) throws Exception {
- for (T v : values) {
- out.collect(v);
- }
- }
- }
-
- public static class CountingWindowFunction<K, T> implements KeyedWindowFunction<K, T, Long> {
-
- @Override
- public void evaluate(K k, Iterable<T> values, Collector<Long> out) throws Exception {
- long count = 0;
- for (T ignored : values) {
- count++;
- }
-
- out.collect(count);
- }
- }
-
- public static class SummingWindowFunction<K> implements KeyedWindowFunction<K, Tuple2<K, Long>, Tuple2<K, Long>> {
+ public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
@Override
- public void evaluate(K key, Iterable<Tuple2<K, Long>> values, Collector<Tuple2<K, Long>> out) throws Exception {
+ public void evaluate(Long key, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
long sum = 0L;
- for (Tuple2<K, Long> value : values) {
+ for (Tuple2<Long, Long> value : values) {
sum += value.f1;
}
- out.collect(new Tuple2<K, Long>(key, sum));
+ out.collect(new Tuple2<>(key, sum));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2bb6a6a..2f4bd23 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.api.scala
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.{ReduceFunction, FlatMapFunction, MapFunction,
- Partitioner, FoldFunction, FilterFunction}
+import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapFunction, Partitioner, FilterFunction}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
@@ -30,17 +30,12 @@ import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, SingleOutputStreamOperator}
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.OperatorState
import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.KeyedDataStream
import org.apache.flink.streaming.api.scala.function.StatefulFunction
@@ -244,20 +239,20 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy(fields: Int*): GroupedDataStream[T] = javaStream.groupBy(fields: _*)
+ def groupBy(fields: Int*): GroupedDataStream[T, JavaTuple] = javaStream.groupBy(fields: _*)
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T] =
+ def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
javaStream.groupBy(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T] = {
+ def groupBy[K: TypeInformation](fun: T => K): GroupedDataStream[T, K] = {
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
@@ -605,7 +600,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
private[flink] def isStatePartitioned: Boolean = {
- javaStream.isInstanceOf[KeyedDataStream[T]]
+ javaStream.isInstanceOf[KeyedDataStream[_, _]]
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
index 34f0807..e1a963d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/GroupedDataStream.scala
@@ -29,7 +29,8 @@ import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.common.functions.ReduceFunction
-class GroupedDataStream[T](javaStream: GroupedJavaStream[T]) extends DataStream[T](javaStream){
+class GroupedDataStream[T, K](javaStream: GroupedJavaStream[T, K])
+ extends DataStream[T](javaStream) {
/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 5e02ec5..9d62bcb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,13 +18,15 @@
package org.apache.flink.streaming.api.scala
+import java.util.Objects
+
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.runtime.state.StateHandleProvider
-import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -294,6 +296,27 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
}
// --------------------------------------------------------------------------------------------
+ // Time characteristic
+ // --------------------------------------------------------------------------------------------
+ /**
+ * Sets the time characteristic for the stream, e.g., processing time, event time,
+ * or ingestion time.
+ *
+ * @param characteristic The time characteristic.
+ */
+ def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
+ javaEnv.setStreamTimeCharacteristic(characteristic)
+ }
+
+ /**
+ * Gets the time characteristic for the stream, e.g., processing time, event time,
+ * or ingestion time.
+ *
+ * @return The time characteristic.
+ */
+ def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
+
+ // --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e20299c/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 2eb4f9e..59843e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -38,8 +38,8 @@ package object scala {
implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
new DataStream[R](javaStream)
- implicit def javaToScalaGroupedStream[R](javaStream: GroupedJavaStream[R]):
- GroupedDataStream[R] = new GroupedDataStream[R](javaStream)
+ implicit def javaToScalaGroupedStream[R, K](javaStream: GroupedJavaStream[R, K]):
+ GroupedDataStream[R, K] = new GroupedDataStream[R, K](javaStream)
implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] =
new WindowedDataStream[R](javaWStream)