You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/24 14:03:15 UTC

flink git commit: [FLINK-6650] [table] Improve the error message for toAppendStream

Repository: flink
Updated Branches:
  refs/heads/master 36830adac -> 61914abff


[FLINK-6650] [table] Improve the error message for toAppendStream

This closes #3958.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61914abf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61914abf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61914abf

Branch: refs/heads/master
Commit: 61914abffa83a55d4f0a339dbcf64c540962a9cd
Parents: 36830ad
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon May 22 17:04:13 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Wed May 24 16:02:40 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/table/api/StreamTableEnvironment.scala      | 2 +-
 .../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala   | 4 ++--
 .../table/plan/nodes/datastream/DataStreamOverAggregate.scala    | 4 ++--
 .../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala | 3 +++
 4 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c430b21..bc5038d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -630,7 +630,7 @@ abstract class StreamTableEnvironment(
     if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
       throw new TableException(
         "Table is not an append-only table. " +
-          "Output needs to handle update and delete changes.")
+        "Use the toRetractStream() in order to handle add and retract messages.")
     }
 
     // get CRow plan

http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 1ac013a..d860cbe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -126,9 +126,9 @@ class DataStreamGroupWindowAggregate(
     val physicalNamedProperties = namedProperties
       .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType))
 
-    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+    val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input)
 
-    if (consumeRetraction) {
+    if (inputIsAccRetract) {
       throw new TableException(
         "Retraction on windowed GroupBy aggregation is not supported yet. " +
           "Note: Windowed GroupBy aggregation should not follow a " +

http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index a9fbf02..08f0356 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -116,9 +116,9 @@ class DataStreamOverAggregate(
 
     val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
-    val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+    val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input)
 
-    if (consumeRetraction) {
+    if (inputIsAccRetract) {
       throw new TableException(
         "Retraction on Over window aggregation is not supported yet. " +
         "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")

http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 8073959..2907b99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -152,6 +152,9 @@ object AggregateUtil {
     * @param inputRowType    Input row type
     * @param inputFieldTypes Types of the physical input fields
     * @param groupings       the position (in the input Row) of the grouping keys
+    * @param queryConfig     The configuration of the query to generate.
+    * @param generateRetraction It is a tag that indicates whether generate retract record.
+    * @param consumeRetraction It is a tag that indicates whether consume the retract record.
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
   private[flink] def createGroupAggregateFunction(