You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/12/11 12:45:01 UTC

[flink] branch master updated: [FLINK-11090][streaming api] Remove unused parameter in WindowedStream.aggregate()

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c6ff79  [FLINK-11090][streaming api] Remove unused parameter in WindowedStream.aggregate()
8c6ff79 is described below

commit 8c6ff79cbc52f18a8b54a95fa9910858d934658b
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Tue Dec 11 14:40:37 2018 +0800

    [FLINK-11090][streaming api] Remove unused parameter in WindowedStream.aggregate()
---
 .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala    | 8 ++++----
 .../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala  | 7 ++-----
 .../apache/flink/streaming/api/datastream/AllWindowedStream.java  | 8 +++-----
 .../org/apache/flink/streaming/api/datastream/WindowedStream.java | 8 +++-----
 .../org/apache/flink/streaming/api/scala/AllWindowedStream.scala  | 8 ++------
 .../org/apache/flink/streaming/api/scala/WindowedStream.scala     | 8 ++------
 6 files changed, 16 insertions(+), 31 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 78a7273..e8bfda0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -199,7 +199,7 @@ class DataStreamGroupWindowAggregate(
         createKeyedWindowedStream(queryConfig, window, keyedStream)
           .asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]
 
-      val (aggFunction, accumulatorRowType, aggResultRowType) =
+      val (aggFunction, accumulatorRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
           namedAggregates,
@@ -211,7 +211,7 @@ class DataStreamGroupWindowAggregate(
           tableEnv.getConfig)
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
         .name(keyedAggOpName)
     }
     // global / non-keyed aggregation
@@ -225,7 +225,7 @@ class DataStreamGroupWindowAggregate(
         createNonKeyedWindowedStream(queryConfig, window, timestampedInput)
           .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
 
-      val (aggFunction, accumulatorRowType, aggResultRowType) =
+      val (aggFunction, accumulatorRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
           generator,
           namedAggregates,
@@ -237,7 +237,7 @@ class DataStreamGroupWindowAggregate(
           tableEnv.getConfig)
 
       windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
         .name(nonKeyedAggOpName)
     }
   }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 1e2df6e..4a50855 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1037,7 +1037,7 @@ object AggregateUtil {
       groupingKeys: Array[Int],
       needMerge: Boolean,
       tableConfig: TableConfig)
-    : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = {
+    : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo) = {
 
     val needRetract = false
     val (aggFields, aggregates, isDistinctAggs, accTypes, _) =
@@ -1068,13 +1068,10 @@ object AggregateUtil {
       None
     )
 
-    val aggResultTypes = namedAggregates.map(a => FlinkTypeFactory.toTypeInfo(a.left.getType))
-
     val accumulatorRowType = new RowTypeInfo(accTypes: _*)
-    val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
     val aggFunction = new AggregateAggFunction(genFunction)
 
-    (aggFunction, accumulatorRowType, aggResultRowType)
+    (aggFunction, accumulatorRowType)
   }
 
   /**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 3f935e3..3728844 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -473,7 +473,7 @@ public class AllWindowedStream<T, W extends Window> {
 		}
 
 		return aggregate(function, new PassThroughAllWindowFunction<W, R>(),
-				accumulatorType, resultType, resultType);
+				accumulatorType, resultType);
 	}
 
 	/**
@@ -510,7 +510,7 @@ public class AllWindowedStream<T, W extends Window> {
 
 		TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);
 
-		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+		return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
 	}
 
 	private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
@@ -566,13 +566,11 @@ public class AllWindowedStream<T, W extends Window> {
 			AggregateFunction<T, ACC, V> aggregateFunction,
 			AllWindowFunction<V, R, W> windowFunction,
 			TypeInformation<ACC> accumulatorType,
-			TypeInformation<V> aggregateResultType,
 			TypeInformation<R> resultType) {
 
 		checkNotNull(aggregateFunction, "aggregateFunction");
 		checkNotNull(windowFunction, "windowFunction");
 		checkNotNull(accumulatorType, "accumulatorType");
-		checkNotNull(aggregateResultType, "aggregateResultType");
 		checkNotNull(resultType, "resultType");
 
 		if (aggregateFunction instanceof RichFunction) {
@@ -857,7 +855,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 *
-	 * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
+	 * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
 	@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 4f8243c..24e1d12 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -601,7 +601,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param windowResultType The process window function result type.
 	 * @return The data stream that is the result of applying the fold function to the window.
 	 *
-	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation)} instead
 	 */
 	@Deprecated
 	@Internal
@@ -728,7 +728,7 @@ public class WindowedStream<T, K, W extends Window> {
 		}
 
 		return aggregate(function, new PassThroughWindowFunction<K, W, R>(),
-				accumulatorType, resultType, resultType);
+			accumulatorType, resultType);
 	}
 
 	/**
@@ -765,7 +765,7 @@ public class WindowedStream<T, K, W extends Window> {
 
 		TypeInformation<R> resultType = getWindowFunctionReturnType(windowFunction, aggResultType);
 
-		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+		return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
 	}
 
 	/**
@@ -793,13 +793,11 @@ public class WindowedStream<T, K, W extends Window> {
 			AggregateFunction<T, ACC, V> aggregateFunction,
 			WindowFunction<V, R, K, W> windowFunction,
 			TypeInformation<ACC> accumulatorType,
-			TypeInformation<V> aggregateResultType,
 			TypeInformation<R> resultType) {
 
 		checkNotNull(aggregateFunction, "aggregateFunction");
 		checkNotNull(windowFunction, "windowFunction");
 		checkNotNull(accumulatorType, "accumulatorType");
-		checkNotNull(aggregateResultType, "aggregateResultType");
 		checkNotNull(resultType, "resultType");
 
 		if (aggregateFunction instanceof RichFunction) {
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index f21c0fb..f237343 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -318,12 +318,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     val applyFunction = new ScalaAllWindowFunctionWrapper[V, R, W](cleanedWindowFunction)
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
-    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
     
     asScalaStream(javaStream.aggregate(
-      cleanedPreAggregator, applyFunction,
-      accumulatorType, aggregationResultType, resultType))
+      cleanedPreAggregator, applyFunction, accumulatorType, resultType))
   }
 
   /**
@@ -384,12 +382,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     val applyFunction = new ScalaAllWindowFunction[V, R, W](cleanWindowFunction)
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
-    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
 
     asScalaStream(javaStream.aggregate(
-      cleanPreAggregator, applyFunction,
-      accumulatorType, aggregationResultType, resultType))
+      cleanPreAggregator, applyFunction, accumulatorType, resultType))
   }
 
   // ----------------------------- fold() -------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 337e5ce..a440e29 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -303,12 +303,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     val applyFunction = new ScalaWindowFunctionWrapper[V, R, K, W](cleanedWindowFunction)
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
-    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
 
     asScalaStream(javaStream.aggregate(
-      cleanedPreAggregator, applyFunction,
-      accumulatorType, aggregationResultType, resultType))
+      cleanedPreAggregator, applyFunction, accumulatorType, resultType))
   }
 
   /**
@@ -333,12 +331,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     val applyFunction = new ScalaWindowFunction[V, R, K, W](cleanedWindowFunction)
 
     val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
-    val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
     val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
 
     asScalaStream(javaStream.aggregate(
-      cleanedPreAggregator, applyFunction,
-      accumulatorType, aggregationResultType, resultType))
+      cleanedPreAggregator, applyFunction, accumulatorType, resultType))
   }
 
   /**