You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:17 UTC

[7/8] flink git commit: [FLINK-4997] Add ProcessWindowFunction support for .aggregate()

[FLINK-4997] Add ProcessWindowFunction support for .aggregate()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe2a3016
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe2a3016
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe2a3016

Branch: refs/heads/master
Commit: fe2a3016f98e45d0c94a3fa1ed8c17b89a516859
Parents: 4f047e1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Feb 7 14:38:25 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 128 +++++++++++++++++++
 .../InternalAggregateProcessWindowFunction.java |  84 ++++++++++++
 .../functions/InternalWindowFunctionTest.java   | 104 +++++++++++++++
 .../streaming/api/scala/WindowedStream.scala    |  30 +++++
 4 files changed, 346 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 45eaae5..6809df0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -64,6 +64,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
@@ -906,6 +907,133 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggFunction The aggregate function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 *
+	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @param <ACC> The type of the AggregateFunction's accumulator
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+	 * @param <R> The type of the elements in the resulting stream, equal to the
+	 *            WindowFunction's result type
+	 */
+	@PublicEvolving
+	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, V> aggFunction,
+			ProcessWindowFunction<V, R, K, W> windowFunction) {
+
+		checkNotNull(aggFunction, "aggFunction");
+		checkNotNull(windowFunction, "windowFunction");
+
+		TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+				windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
+
+		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+	 * @param resultType Type information for the result type of the window function
+	 *
+	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @param <ACC> The type of the AggregateFunction's accumulator
+	 * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+	 * @param <R> The type of the elements in the resulting stream, equal to the
+	 *            WindowFunction's result type
+	 */
+	@PublicEvolving
+	public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, V> aggregateFunction,
+			ProcessWindowFunction<V, R, K, W> windowFunction,
+			TypeInformation<ACC> accumulatorType,
+			TypeInformation<V> aggregateResultType,
+			TypeInformation<R> resultType) {
+
+		checkNotNull(aggregateFunction, "aggregateFunction");
+		checkNotNull(windowFunction, "windowFunction");
+		checkNotNull(accumulatorType, "accumulatorType");
+		checkNotNull(aggregateResultType, "aggregateResultType");
+		checkNotNull(resultType, "resultType");
+
+		if (aggregateFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+		}
+
+		//clean the closures
+		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+		aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalAggregateProcessWindowFunction<>(aggregateFunction, windowFunction),
+					trigger,
+					evictor,
+					allowedLateness);
+
+		} else {
+			AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
+					aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueProcessWindowFunction<>(windowFunction),
+					trigger,
+					allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Window Function (apply)
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
new file mode 100644
index 0000000..433da9b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an
+ * {@code Iterable} and an {@link AggregateFunction}.
+ *
+ * @param <K> The key type
+ * @param <W> The window type
+ * @param <T> The type of the input to the AggregateFunction
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of the AggregateFunction's result, and the input to the WindowFunction
+ * @param <R> The result type of the WindowFunction
+ */
+public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W extends Window>
+		extends WrappingFunction<ProcessWindowFunction<V, R, K, W>>
+		implements InternalWindowFunction<Iterable<T>, R, K, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final AggregateFunction<T, ACC, V> aggFunction;
+
+	public InternalAggregateProcessWindowFunction(
+			AggregateFunction<T, ACC, V> aggFunction,
+			ProcessWindowFunction<V, R, K, W> windowFunction) {
+		super(windowFunction);
+		this.aggFunction = aggFunction;
+	}
+	
+	@Override
+	public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+		ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
+			@Override
+			public W window() {
+				return window;
+			}
+		};
+
+		final ACC acc = aggFunction.createAccumulator();
+
+		for (T val : input) {
+			aggFunction.add(val, acc);
+		}
+
+		wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		throw new RuntimeException("This should never be called.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 3c73035..e49a496 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.windowing.functions;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
@@ -39,6 +41,17 @@ import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.hamcrest.core.AllOf.allOf;
 import static org.mockito.Mockito.*;
 
 public class InternalWindowFunctionTest {
@@ -288,6 +301,84 @@ public class InternalWindowFunctionTest {
 		verify(mock).close();
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testInternalAggregateProcessWindowFunction() throws Exception {
+
+		AggregateProcessWindowFunctionMock mock = mock(AggregateProcessWindowFunctionMock.class);
+
+		InternalAggregateProcessWindowFunction<Long, Set<Long>, Map<Long, Long>, String, Long, TimeWindow> windowFunction =
+				new InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public Set<Long> createAccumulator() {
+						return new HashSet<>();
+					}
+
+					@Override
+					public void add(Long value, Set<Long> accumulator) {
+						accumulator.add(value);
+					}
+
+					@Override
+					public Map<Long, Long> getResult(Set<Long> accumulator) {
+						Map<Long, Long> result = new HashMap<>();
+						for (Long in : accumulator) {
+							result.put(in, in);
+						}
+						return result;
+					}
+
+					@Override
+					public Set<Long> merge(Set<Long> a, Set<Long> b) {
+						a.addAll(b);
+						return a;
+					}
+				}, mock);
+
+		// check setOutputType
+		TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+		ExecutionConfig execConf = new ExecutionConfig();
+		execConf.setParallelism(42);
+
+		StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+		verify(mock).setOutputType(stringType, execConf);
+
+		// check open
+		Configuration config = new Configuration();
+
+		windowFunction.open(config);
+		verify(mock).open(config);
+
+		// check setRuntimeContext
+		RuntimeContext rCtx = mock(RuntimeContext.class);
+
+		windowFunction.setRuntimeContext(rCtx);
+		verify(mock).setRuntimeContext(rCtx);
+
+		// check apply
+		TimeWindow w = mock(TimeWindow.class);
+		Collector<String> c = (Collector<String>) mock(Collector.class);
+
+		List<Long> args = new LinkedList<>();
+		args.add(23L);
+		args.add(24L);
+		
+		windowFunction.apply(42L, w, args, c);
+		verify(mock).process(
+				eq(42L),
+				(AggregateProcessWindowFunctionMock.Context) anyObject(),
+				(Iterable) argThat(containsInAnyOrder(allOf(
+						hasEntry(is(23L), is(23L)),
+						hasEntry(is(24L), is(24L))))),
+				eq(c));
+
+		// check close
+		windowFunction.close();
+		verify(mock).close();
+	}
+
 	public static class ProcessWindowFunctionMock
 		extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
@@ -301,6 +392,19 @@ public class InternalWindowFunctionTest {
 		public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
 	}
 
+	public static class AggregateProcessWindowFunctionMock
+			extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
+			implements OutputTypeConfigurable<String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+		@Override
+		public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+	}
+
 	public static class WindowFunctionMock
 		extends RichWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe2a3016/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 96ff334..a5fbeb9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -326,6 +326,36 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
       accumulatorType, aggregationResultType, resultType))
   }
 
+  /**
+    * Applies the given window function to each window. The window function is called for each
+    * evaluation of the window for each key individually. The output of the window function is
+    * interpreted as a regular non-windowed stream.
+    *
+    * Arriving data is pre-aggregated using the given aggregation function.
+    *
+    * @param preAggregator The aggregation function that is used for pre-aggregation
+    * @param windowFunction The window function.
+    * @return The data stream that is the result of applying the window function to the window.
+    */
+  def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
+  (preAggregator: AggregateFunction[T, ACC, V],
+   windowFunction: ProcessWindowFunction[V, R, K, W]): DataStream[R] = {
+
+    val cleanedPreAggregator = clean(preAggregator)
+    val cleanedWindowFunction = clean(windowFunction)
+
+    val applyFunction = new ScalaProcessWindowFunctionWrapper[V, R, K, W](cleanedWindowFunction)
+
+    val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
+    val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.aggregate(
+      cleanedPreAggregator, applyFunction,
+      accumulatorType, aggregationResultType, resultType))
+  }
+
+
   // ---------------------------- fold() ------------------------------------
 
   /**