You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/03/21 12:59:50 UTC

flink git commit: [FLINK-6138] [table] Create the ListStateDescriptor with the aggregationStateType instead of a serializer.

Repository: flink
Updated Branches:
  refs/heads/master 17dd915e8 -> e14135518


[FLINK-6138] [table] Create the ListStateDescriptor with the aggregationStateType instead of a serializer.

this closes #3581


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1413551
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1413551
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1413551

Branch: refs/heads/master
Commit: e14135518d51e6b491f2cd512234b71f1cf1d716
Parents: 17dd915
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Tue Mar 21 12:50:02 2017 +0800
Committer: Jark Wu <wu...@alibaba-inc.com>
Committed: Tue Mar 21 20:58:16 2017 +0800

----------------------------------------------------------------------
 .../UnboundedNonPartitionedProcessingOverProcessFunction.scala   | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1413551/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
index 51c8315..7750511 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
@@ -98,9 +98,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction(
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = {
-    val stateSerializer =
-      aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
-    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
+    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
     state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
   }
 }