You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/09 16:50:56 UTC

[3/4] flink git commit: [FLINK-6486] [table] Pass RowTypeInfo to CodeGenerator instead of CRowTypeInfo.

[FLINK-6486] [table] Pass RowTypeInfo to CodeGenerator instead of CRowTypeInfo.

This closes #3850.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5ddbe5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5ddbe5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5ddbe5c

Branch: refs/heads/master
Commit: b5ddbe5c360003b210a1212e54e6c50b8af538fa
Parents: e2cb221
Author: Hequn Cheng <ch...@gmail.com>
Authored: Mon May 8 20:55:51 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 9 18:50:20 2017 +0200

----------------------------------------------------------------------
 .../table/plan/nodes/datastream/DataStreamGroupAggregate.scala     | 2 +-
 .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala     | 2 +-
 .../table/plan/nodes/datastream/DataStreamOverAggregate.scala      | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 18f1fc8..506c0cb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -115,7 +115,7 @@ class DataStreamGroupAggregate(
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
-      inputDS.getType)
+      inputSchema.physicalTypeInfo)
 
     val aggString = aggregationToString(
       inputSchema.logicalType,

http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c38e5af..ef207b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -142,7 +142,7 @@ class DataStreamGroupWindowAggregate(
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
-      inputDS.getType)
+      inputSchema.physicalTypeInfo)
 
     val needMerge = window match {
       case SessionGroupWindow(_, _, _) => true

http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index e823cd6..4061242 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -116,7 +116,7 @@ class DataStreamOverAggregate(
     val generator = new CodeGenerator(
       tableEnv.getConfig,
       false,
-      inputDS.getType)
+      inputSchema.physicalTypeInfo)
 
     val timeType = schema.logicalType
       .getFieldList