You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/24 14:03:15 UTC
flink git commit: [FLINK-6650] [table] Improve the error message for
toAppendStream
Repository: flink
Updated Branches:
refs/heads/master 36830adac -> 61914abff
[FLINK-6650] [table] Improve the error message for toAppendStream
This closes #3958.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61914abf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61914abf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61914abf
Branch: refs/heads/master
Commit: 61914abffa83a55d4f0a339dbcf64c540962a9cd
Parents: 36830ad
Author: sunjincheng121 <su...@gmail.com>
Authored: Mon May 22 17:04:13 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Wed May 24 16:02:40 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/table/api/StreamTableEnvironment.scala | 2 +-
.../plan/nodes/datastream/DataStreamGroupWindowAggregate.scala | 4 ++--
.../table/plan/nodes/datastream/DataStreamOverAggregate.scala | 4 ++--
.../org/apache/flink/table/runtime/aggregate/AggregateUtil.scala | 3 +++
4 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c430b21..bc5038d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -630,7 +630,7 @@ abstract class StreamTableEnvironment(
if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
throw new TableException(
"Table is not an append-only table. " +
- "Output needs to handle update and delete changes.")
+ "Use the toRetractStream() in order to handle add and retract messages.")
}
// get CRow plan
http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 1ac013a..d860cbe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -126,9 +126,9 @@ class DataStreamGroupWindowAggregate(
val physicalNamedProperties = namedProperties
.filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType))
- val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+ val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input)
- if (consumeRetraction) {
+ if (inputIsAccRetract) {
throw new TableException(
"Retraction on windowed GroupBy aggregation is not supported yet. " +
"Note: Windowed GroupBy aggregation should not follow a " +
http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index a9fbf02..08f0356 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -116,9 +116,9 @@ class DataStreamOverAggregate(
val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
- val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+ val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input)
- if (consumeRetraction) {
+ if (inputIsAccRetract) {
throw new TableException(
"Retraction on Over window aggregation is not supported yet. " +
"Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
http://git-wip-us.apache.org/repos/asf/flink/blob/61914abf/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 8073959..2907b99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -152,6 +152,9 @@ object AggregateUtil {
* @param inputRowType Input row type
* @param inputFieldTypes Types of the physical input fields
* @param groupings the position (in the input Row) of the grouping keys
+ * @param queryConfig The configuration of the query to generate.
+ * @param generateRetraction It is a tag that indicates whether generate retract record.
+ * @param consumeRetraction It is a tag that indicates whether consume the retract record.
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
private[flink] def createGroupAggregateFunction(