You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/12/11 12:45:01 UTC
[flink] branch master updated: [FLINK-11090][streaming api] Remove
unused parameter in WindowedStream.aggregate()
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8c6ff79 [FLINK-11090][streaming api] Remove unused parameter in WindowedStream.aggregate()
8c6ff79 is described below
commit 8c6ff79cbc52f18a8b54a95fa9910858d934658b
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Tue Dec 11 14:40:37 2018 +0800
[FLINK-11090][streaming api] Remove unused parameter in WindowedStream.aggregate()
---
.../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 8 ++++----
.../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala | 7 ++-----
.../apache/flink/streaming/api/datastream/AllWindowedStream.java | 8 +++-----
.../org/apache/flink/streaming/api/datastream/WindowedStream.java | 8 +++-----
.../org/apache/flink/streaming/api/scala/AllWindowedStream.scala | 8 ++------
.../org/apache/flink/streaming/api/scala/WindowedStream.scala | 8 ++------
6 files changed, 16 insertions(+), 31 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 78a7273..e8bfda0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -199,7 +199,7 @@ class DataStreamGroupWindowAggregate(
createKeyedWindowedStream(queryConfig, window, keyedStream)
.asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]
- val (aggFunction, accumulatorRowType, aggResultRowType) =
+ val (aggFunction, accumulatorRowType) =
AggregateUtil.createDataStreamAggregateFunction(
generator,
namedAggregates,
@@ -211,7 +211,7 @@ class DataStreamGroupWindowAggregate(
tableEnv.getConfig)
windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
.name(keyedAggOpName)
}
// global / non-keyed aggregation
@@ -225,7 +225,7 @@ class DataStreamGroupWindowAggregate(
createNonKeyedWindowedStream(queryConfig, window, timestampedInput)
.asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
- val (aggFunction, accumulatorRowType, aggResultRowType) =
+ val (aggFunction, accumulatorRowType) =
AggregateUtil.createDataStreamAggregateFunction(
generator,
namedAggregates,
@@ -237,7 +237,7 @@ class DataStreamGroupWindowAggregate(
tableEnv.getConfig)
windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
.name(nonKeyedAggOpName)
}
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 1e2df6e..4a50855 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1037,7 +1037,7 @@ object AggregateUtil {
groupingKeys: Array[Int],
needMerge: Boolean,
tableConfig: TableConfig)
- : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = {
+ : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo) = {
val needRetract = false
val (aggFields, aggregates, isDistinctAggs, accTypes, _) =
@@ -1068,13 +1068,10 @@ object AggregateUtil {
None
)
- val aggResultTypes = namedAggregates.map(a => FlinkTypeFactory.toTypeInfo(a.left.getType))
-
val accumulatorRowType = new RowTypeInfo(accTypes: _*)
- val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
val aggFunction = new AggregateAggFunction(genFunction)
- (aggFunction, accumulatorRowType, aggResultRowType)
+ (aggFunction, accumulatorRowType)
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 3f935e3..3728844 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -473,7 +473,7 @@ public class AllWindowedStream<T, W extends Window> {
}
return aggregate(function, new PassThroughAllWindowFunction<W, R>(),
- accumulatorType, resultType, resultType);
+ accumulatorType, resultType);
}
/**
@@ -510,7 +510,7 @@ public class AllWindowedStream<T, W extends Window> {
TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);
- return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+ return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
}
private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
@@ -566,13 +566,11 @@ public class AllWindowedStream<T, W extends Window> {
AggregateFunction<T, ACC, V> aggregateFunction,
AllWindowFunction<V, R, W> windowFunction,
TypeInformation<ACC> accumulatorType,
- TypeInformation<V> aggregateResultType,
TypeInformation<R> resultType) {
checkNotNull(aggregateFunction, "aggregateFunction");
checkNotNull(windowFunction, "windowFunction");
checkNotNull(accumulatorType, "accumulatorType");
- checkNotNull(aggregateResultType, "aggregateResultType");
checkNotNull(resultType, "resultType");
if (aggregateFunction instanceof RichFunction) {
@@ -857,7 +855,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*
- * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
+ * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead
*/
@PublicEvolving
@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 4f8243c..24e1d12 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -601,7 +601,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param windowResultType The process window function result type.
* @return The data stream that is the result of applying the fold function to the window.
*
- * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
+ * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation)} instead
*/
@Deprecated
@Internal
@@ -728,7 +728,7 @@ public class WindowedStream<T, K, W extends Window> {
}
return aggregate(function, new PassThroughWindowFunction<K, W, R>(),
- accumulatorType, resultType, resultType);
+ accumulatorType, resultType);
}
/**
@@ -765,7 +765,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<R> resultType = getWindowFunctionReturnType(windowFunction, aggResultType);
- return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+ return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
}
/**
@@ -793,13 +793,11 @@ public class WindowedStream<T, K, W extends Window> {
AggregateFunction<T, ACC, V> aggregateFunction,
WindowFunction<V, R, K, W> windowFunction,
TypeInformation<ACC> accumulatorType,
- TypeInformation<V> aggregateResultType,
TypeInformation<R> resultType) {
checkNotNull(aggregateFunction, "aggregateFunction");
checkNotNull(windowFunction, "windowFunction");
checkNotNull(accumulatorType, "accumulatorType");
- checkNotNull(aggregateResultType, "aggregateResultType");
checkNotNull(resultType, "resultType");
if (aggregateFunction instanceof RichFunction) {
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index f21c0fb..f237343 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -318,12 +318,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
val applyFunction = new ScalaAllWindowFunctionWrapper[V, R, W](cleanedWindowFunction)
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanedPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanedPreAggregator, applyFunction, accumulatorType, resultType))
}
/**
@@ -384,12 +382,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
val applyFunction = new ScalaAllWindowFunction[V, R, W](cleanWindowFunction)
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanPreAggregator, applyFunction, accumulatorType, resultType))
}
// ----------------------------- fold() -------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 337e5ce..a440e29 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -303,12 +303,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val applyFunction = new ScalaWindowFunctionWrapper[V, R, K, W](cleanedWindowFunction)
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanedPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanedPreAggregator, applyFunction, accumulatorType, resultType))
}
/**
@@ -333,12 +331,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val applyFunction = new ScalaWindowFunction[V, R, K, W](cleanedWindowFunction)
val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
- val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.aggregate(
- cleanedPreAggregator, applyFunction,
- accumulatorType, aggregationResultType, resultType))
+ cleanedPreAggregator, applyFunction, accumulatorType, resultType))
}
/**