You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:38 UTC
[04/13] flink git commit: [FLINK-2550] Simplify Stream Java API Class
Names
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
new file mode 100644
index 0000000..d78e2c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+public abstract class RichAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements AllWindowFunction<IN, OUT, W> {
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
deleted file mode 100644
index 90ccb40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichKeyedWindowFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-public abstract class RichKeyedWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements KeyedWindowFunction<IN, OUT, KEY, W> {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
index b40b74a..0d40bbd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
@@ -20,6 +20,6 @@ package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
-public abstract class RichWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, W> {
+public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> {
private static final long serialVersionUID = 1L;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
index 1a4304e..eda12c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
@@ -25,19 +25,23 @@ import org.apache.flink.util.Collector;
import java.io.Serializable;
/**
- * Base interface for functions that are evaluated over non-keyed windows.
+ * Base interface for functions that are evaluated over keyed (grouped) windows.
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
*/
-public interface WindowFunction<IN, OUT, W extends Window> extends Function, Serializable {
+public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
- *
- * @param values
- * @param out
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param key The key for which this window is evaluated.
+ * @param window The window that is being evaluated.
+ * @param values The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
- void evaluate(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
+ void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 496da6b..55c1be0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.util.UnionIterator;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -34,13 +34,13 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
- private final KeyedWindowFunction<Type, Result, Key, Window> function;
+ private final WindowFunction<Type, Result, Key, Window> function;
private long evaluationPass;
// ------------------------------------------------------------------------
- public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, KeyedWindowFunction<Type, Result, Key, Window> function) {
+ public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, WindowFunction<Type, Result, Key, Window> function) {
this.keySelector = keySelector;
this.function = function;
}
@@ -59,7 +59,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
if (previousPanes.isEmpty()) {
// optimized path for single pane case (tumbling window)
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
- function.evaluate(entry.getKey(), window, entry.getValue(), out);
+ function.apply(entry.getKey(), window, entry.getValue(), out);
}
}
else {
@@ -77,7 +77,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
- private final KeyedWindowFunction<Type, Result, Key, Window> function;
+ private final WindowFunction<Type, Result, Key, Window> function;
private final UnionIterator<Type> unionIterator;
@@ -87,7 +87,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private TimeWindow window;
- WindowFunctionTraversal(KeyedWindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
+ WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
@@ -108,7 +108,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
@Override
public void keyDone() throws Exception {
- function.evaluate(currentKey, window, unionIterator, out);
+ function.apply(currentKey, window, unionIterator, out);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index ace3823..3bcffbc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -20,19 +20,19 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
- extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>> {
+ extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, WindowFunction<IN, OUT, KEY, TimeWindow>> {
private static final long serialVersionUID = 7305948082830843475L;
public AccumulatingProcessingTimeWindowOperator(
- KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
+ WindowFunction<IN, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector,
long windowLength,
long windowSlide)
@@ -43,7 +43,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
@Override
protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
@SuppressWarnings("unchecked")
- KeyedWindowFunction<IN, OUT, KEY, Window> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, Window>) function;
+ WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index d5ed6cb..53df838 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -41,7 +41,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
public EvictingNonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
- WindowFunction<IN, OUT, W> windowFunction,
+ AllWindowFunction<IN, OUT, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
super(windowAssigner, windowBufferFactory, windowFunction, trigger);
@@ -77,7 +77,7 @@ public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends N
windowBuffer.removeElements(toEvict);
- userFunction.evaluate(
+ userFunction.apply(
window,
bufferAndTrigger.f0.getUnpackedElements(),
timestampedCollector);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index cddcc42..334eb54 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
@@ -44,7 +44,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
KeySelector<IN, K> keySelector,
WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory,
- KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+ WindowFunction<IN, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor) {
super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger);
@@ -87,7 +87,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
windowBuffer.removeElements(toEvict);
- userFunction.evaluate(key,
+ userFunction.apply(key,
window,
bufferAndTrigger.f0.getUnpackedElements(),
timestampedCollector);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 3a85759..d48643d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -46,7 +46,7 @@ import java.util.Map;
import java.util.Set;
public class NonKeyedWindowOperator<IN, OUT, W extends Window>
- extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, W>>
+ extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
private static final long serialVersionUID = 1L;
@@ -72,7 +72,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
public NonKeyedWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
- WindowFunction<IN, OUT, W> windowFunction,
+ AllWindowFunction<IN, OUT, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {
super(windowFunction);
@@ -157,7 +157,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
- userFunction.evaluate(
+ userFunction.apply(
window,
bufferAndTrigger.f0.getUnpackedElements(),
timestampedCollector);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index cda4481..2d4635f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -47,7 +47,7 @@ import java.util.Map;
import java.util.Set;
public class WindowOperator<K, IN, OUT, W extends Window>
- extends AbstractUdfStreamOperator<OUT, KeyedWindowFunction<IN, OUT, K, W>>
+ extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
private static final long serialVersionUID = 1L;
@@ -75,7 +75,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
KeySelector<IN, K> keySelector,
WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
- KeyedWindowFunction<IN, OUT, K, W> windowFunction,
+ WindowFunction<IN, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger) {
super(windowFunction);
@@ -181,7 +181,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
- userFunction.evaluate(key,
+ userFunction.apply(key,
window,
bufferAndTrigger.f0.getUnpackedElements(),
timestampedCollector);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 0ac352b..a8c4b49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -157,7 +157,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
} else {
throw new RuntimeException(
- "Partitioned state can only be used with KeyedDataStreams.");
+ "Partitioned state can only be used with KeyedStreams.");
}
} else {
return new StreamOperatorState(provider);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 207b1b1..ead3af8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.KeyedDataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.Output;
@@ -113,7 +113,7 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
- KeyedDataStream<Integer, Integer> keyedStream = env
+ KeyedStream<Integer, Integer> keyedStream = env
.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
.keyBy(new ModKey(4));
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 99a2e14..89672df 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
@@ -49,7 +49,7 @@ import static org.junit.Assert.*;
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@SuppressWarnings("unchecked")
- private final KeyedWindowFunction<String, String, String, TimeWindow> mockFunction = mock(KeyedWindowFunction.class);
+ private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
@SuppressWarnings("unchecked")
private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -61,11 +61,14 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
};
- private final KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
- new KeyedWindowFunction<Integer, Integer, Integer, TimeWindow>()
+ private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+ new WindowFunction<Integer, Integer, Integer, TimeWindow>()
{
@Override
- public void evaluate(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
+ public void apply(Integer key,
+ TimeWindow window,
+ Iterable<Integer> values,
+ Collector<Integer> out) {
for (Integer val : values) {
assertEquals(key, val);
out.collect(val);
@@ -472,7 +475,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
when(mockContext.getTaskName()).thenReturn("Test task name");
- KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
+ WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
// the operator has a window time that is so long that it will not fire in this test
final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
@@ -523,7 +526,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> {
+ private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
private final int failAfterElements;
@@ -534,7 +537,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
@Override
- public void evaluate(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ public void apply(Integer integer,
+ TimeWindow window,
+ Iterable<Integer> values,
+ Collector<Integer> out) throws Exception {
for (Integer i : values) {
out.collect(i);
numElements++;
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
new file mode 100644
index 0000000..ee8c6d6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
+import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * These tests verify that the api calls on
+ * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
+ * the correct window operator.
+ */
+public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
+
+ /**
+ * These tests ensure that the fast aligned time windows operator is used if the
+ * conditions are right.
+ *
+ * TODO: update once fast aligned time windows operator is in
+ */
+ @Ignore
+ @Test
+ public void testFastTimeWindows() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testNonEvicting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .trigger(CountTrigger.of(100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+ NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .windowAll(TumblingProcessingTimeWindows.of(1000))
+ .trigger(CountTrigger.of(100))
+ .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+ NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testEvicting() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .evictor(CountEvictor.of(100))
+ .reduceWindow(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
+ EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .windowAll(TumblingProcessingTimeWindows.of(1000))
+ .trigger(CountTrigger.of(100))
+ .evictor(TimeEvictor.of(100))
+ .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
+ EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
+ // ------------------------------------------------------------------------
+ // UDFs
+ // ------------------------------------------------------------------------
+
+ public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+ return value1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
index 0dfceab..3139941 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperatorTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
@@ -56,7 +55,7 @@ public class EvictingNonKeyedWindowOperatorTest {
EvictingNonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingNonKeyedWindowOperator<>(
GlobalWindows.create(),
new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
- new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+ new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE));
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 01381f9..3d9605e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -56,7 +56,7 @@ public class EvictingWindowOperatorTest {
GlobalWindows.create(),
new TupleKeySelector(),
new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(),
- new ReduceKeyedWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
+ new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE));
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index b74b3ea..9b0bcc4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
@@ -76,7 +75,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
windowBufferFactory,
- new ReduceWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -156,7 +155,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
TumblingTimeWindows.of(WINDOW_SIZE),
windowBufferFactory,
- new ReduceWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -234,7 +233,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
GlobalWindows.create(),
windowBufferFactory,
- new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
ContinuousWatermarkTrigger.of(WINDOW_SIZE));
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -312,7 +311,7 @@ public class NonKeyedWindowOperatorTest {
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
GlobalWindows.create(),
windowBufferFactory,
- new ReduceWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
deleted file mode 100644
index 4babee1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonParallelWindowDataStreamTranslationTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
- * the correct window operator.
- */
-public class NonParallelWindowDataStreamTranslationTest extends StreamingMultipleProgramsTestBase {
-
- /**
- * These tests ensure that the fast aligned time windows operator is used if the
- * conditions are right.
- *
- * TODO: update once fast aligned time windows operator is in
- */
- @Ignore
- @Test
- public void testFastTimeWindows() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
- .reduceWindow(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
- .mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void evaluate(
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testNonEvicting() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
- .trigger(CountTrigger.of(100))
- .reduceWindow(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
- NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(1000))
- .trigger(CountTrigger.of(100))
- .mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void evaluate(
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
- NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testEvicting() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
- .evictor(CountEvictor.of(100))
- .reduceWindow(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
- EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
- Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(1000))
- .trigger(CountTrigger.of(100))
- .evictor(TimeEvictor.of(100))
- .mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void evaluate(
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
- EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
- Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
- Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- // ------------------------------------------------------------------------
- // UDFs
- // ------------------------------------------------------------------------
-
- public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
- return value1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index aaf21e0..76d7bfe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -21,9 +21,10 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -38,7 +39,7 @@ import java.util.concurrent.TimeUnit;
/**
* These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * {@link WindowedStream} instantiate
* the correct window operator.
*/
public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
@@ -67,11 +68,11 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
- .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void evaluate(Tuple tuple,
+ public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
@@ -110,11 +111,11 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<String, Integer>> window2 = source
.timeWindowAll(Time.of(1000, TimeUnit.MILLISECONDS))
- .mapWindow(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void evaluate(
+ public void apply(
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 3107d51..1bfd1d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.api.functions.windowing.ReduceKeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -76,7 +76,7 @@ public class WindowOperatorTest {
SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
new TupleKeySelector(),
windowBufferFactory,
- new ReduceKeyedWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -163,7 +163,7 @@ public class WindowOperatorTest {
TumblingTimeWindows.of(WINDOW_SIZE),
new TupleKeySelector(),
windowBufferFactory,
- new ReduceKeyedWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -246,7 +246,7 @@ public class WindowOperatorTest {
GlobalWindows.create(),
new TupleKeySelector(),
windowBufferFactory,
- new ReduceKeyedWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
ContinuousWatermarkTrigger.of(WINDOW_SIZE));
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -331,7 +331,7 @@ public class WindowOperatorTest {
GlobalWindows.create(),
new TupleKeySelector(),
windowBufferFactory,
- new ReduceKeyedWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
+ new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 43e7715..a3e6085 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -21,8 +21,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -41,7 +42,7 @@ import org.junit.Test;
/**
* These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate
+ * {@link WindowedStream} instantiate
* the correct window operator.
*/
public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
@@ -70,11 +71,11 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(1000, 100))
- .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void evaluate(Tuple tuple,
+ public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
@@ -114,11 +115,11 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(1000))
.trigger(CountTrigger.of(100))
- .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void evaluate(Tuple tuple,
+ public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
@@ -164,11 +165,11 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.window(TumblingProcessingTimeWindows.of(1000))
.trigger(CountTrigger.of(100))
.evictor(TimeEvictor.of(100))
- .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@Override
- public void evaluate(Tuple tuple,
+ public void apply(Tuple tuple,
TimeWindow window,
Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index a5f1e89..5d32b8e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -81,10 +81,10 @@ public class GroupedProcessingTimeWindowExample {
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
.reduceWindow(new SummingReducer())
- // alternative: use a mapWindow function which does not pre-aggregate
+ // alternative: use a apply function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
-// .mapWindow(new SummingWindowFunction())
+// .apply(new SummingWindowFunction())
.addSink(new SinkFunction<Tuple2<Long, Long>>() {
@Override
@@ -104,10 +104,10 @@ public class GroupedProcessingTimeWindowExample {
}
}
- public static class SummingWindowFunction implements KeyedWindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
+ public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
@Override
- public void evaluate(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+ public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
long sum = 0L;
for (Tuple2<Long, Long> value : values) {
sum += value.f1;
http://git-wip-us.apache.org/repos/asf/flink/blob/9e6e0aec/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 9f4f52a..ca5fc48 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.{RichMapFunction, RichFlatMapFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.function.StatefulFunction
+import org.apache.flink.streaming.api.datastream.{KeyedStream => JavaKeyedStream}
class DataStream[T](javaStream: JavaStream[T]) {
@@ -246,7 +247,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
- def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
+ def groupBy(firstField: String, otherFields: String*): GroupedDataStream[T, JavaTuple] =
javaStream.groupBy(firstField +: otherFields.toArray: _*)
/**
@@ -601,7 +602,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
private[flink] def isStatePartitioned: Boolean = {
- javaStream.isInstanceOf[KeyedDataStream[_, _]]
+ javaStream.isInstanceOf[JavaKeyedStream[_, _]]
}
/**