You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/09 16:50:56 UTC
[3/4] flink git commit: [FLINK-6486] [table] Pass RowTypeInfo to
CodeGenerator instead of CRowTypeInfo.
[FLINK-6486] [table] Pass RowTypeInfo to CodeGenerator instead of CRowTypeInfo.
This closes #3850.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5ddbe5c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5ddbe5c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5ddbe5c
Branch: refs/heads/master
Commit: b5ddbe5c360003b210a1212e54e6c50b8af538fa
Parents: e2cb221
Author: Hequn Cheng <ch...@gmail.com>
Authored: Mon May 8 20:55:51 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 9 18:50:20 2017 +0200
----------------------------------------------------------------------
.../table/plan/nodes/datastream/DataStreamGroupAggregate.scala | 2 +-
.../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 2 +-
.../table/plan/nodes/datastream/DataStreamOverAggregate.scala | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 18f1fc8..506c0cb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -115,7 +115,7 @@ class DataStreamGroupAggregate(
val generator = new CodeGenerator(
tableEnv.getConfig,
false,
- inputDS.getType)
+ inputSchema.physicalTypeInfo)
val aggString = aggregationToString(
inputSchema.logicalType,
http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c38e5af..ef207b0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -142,7 +142,7 @@ class DataStreamGroupWindowAggregate(
val generator = new CodeGenerator(
tableEnv.getConfig,
false,
- inputDS.getType)
+ inputSchema.physicalTypeInfo)
val needMerge = window match {
case SessionGroupWindow(_, _, _) => true
http://git-wip-us.apache.org/repos/asf/flink/blob/b5ddbe5c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index e823cd6..4061242 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -116,7 +116,7 @@ class DataStreamOverAggregate(
val generator = new CodeGenerator(
tableEnv.getConfig,
false,
- inputDS.getType)
+ inputSchema.physicalTypeInfo)
val timeType = schema.logicalType
.getFieldList