You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/12 06:42:51 UTC
[3/5] flink git commit: [FLINK-6491] [table] Add QueryConfig and
state clean up for non-windowed aggregates.
[FLINK-6491] [table] Add QueryConfig and state clean up for non-windowed aggregates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/003f81a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/003f81a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/003f81a7
Branch: refs/heads/release-1.3
Commit: 003f81a73fe38edcd17f10f8e6afb50ba23e3c28
Parents: f3ce088
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon May 8 18:41:37 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 12 08:33:44 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 17 ++-
.../apache/flink/table/api/QueryConfig.scala | 102 ++++++++++++++++
.../table/api/StreamTableEnvironment.scala | 51 ++++++--
.../flink/table/api/TableEnvironment.scala | 2 +-
.../table/api/java/StreamTableEnvironment.scala | 115 +++++++++++++++++--
.../api/scala/StreamTableEnvironment.scala | 46 +++++++-
.../table/api/scala/TableConversions.scala | 40 ++++++-
.../org/apache/flink/table/api/table.scala | 26 ++++-
.../plan/nodes/datastream/DataStreamCalc.scala | 8 +-
.../nodes/datastream/DataStreamCorrelate.scala | 8 +-
.../datastream/DataStreamGroupAggregate.scala | 20 +++-
.../DataStreamGroupWindowAggregate.scala | 8 +-
.../datastream/DataStreamOverAggregate.scala | 9 +-
.../plan/nodes/datastream/DataStreamRel.scala | 7 +-
.../plan/nodes/datastream/DataStreamScan.scala | 7 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 10 +-
.../nodes/datastream/DataStreamValues.scala | 6 +-
.../datastream/StreamTableSourceScan.scala | 7 +-
.../table/runtime/aggregate/AggregateUtil.scala | 6 +-
.../aggregate/GroupAggProcessFunction.scala | 54 ++++++++-
.../table/utils/MockTableEnvironment.scala | 9 +-
21 files changed, 494 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 2a3cedf..f33c187 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
+import org.apache.flink.table.expressions.{Expression, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -113,9 +113,20 @@ abstract class BatchTableEnvironment(
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
+ * @param qConfig The configuration for the query to generate.
* @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
*/
- override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+ override private[flink] def writeToSink[T](
+ table: Table,
+ sink: TableSink[T],
+ qConfig: QueryConfig): Unit = {
+
+ // We do not pass the configuration on, because there is nothing to configure for batch queries.
+ val bQConfig = qConfig match {
+ case batchConfig: BatchQueryConfig => batchConfig
+ case _ =>
+ throw new TableException("BatchQueryConfig required to configure batch query.")
+ }
sink match {
case batchSink: BatchTableSink[T] =>
@@ -125,7 +136,7 @@ abstract class BatchTableEnvironment(
// Give the DataSet to the TableSink to emit it.
batchSink.emitDataSet(result)
case _ =>
- throw new TableException("BatchTableSink required to emit batch Table")
+ throw new TableException("BatchTableSink required to emit batch Table.")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
new file mode 100644
index 0000000..8e8b5ac
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/QueryConfig.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.io.Serializable
+import org.apache.flink.api.common.time.Time
+
+class QueryConfig private[table] extends Serializable {}
+
+/**
+ * The [[BatchQueryConfig]] holds parameters to configure the behavior of batch queries.
+ */
+class BatchQueryConfig private[table] extends QueryConfig
+
+/**
+ * The [[StreamQueryConfig]] holds parameters to configure the behavior of streaming queries.
+ *
+ * An empty [[StreamQueryConfig]] can be generated using the [[StreamTableEnvironment.qConf]]
+ * method.
+ */
+class StreamQueryConfig private[table] extends QueryConfig {
+
+ /**
+ * The minimum time until state which was not updated will be retained.
+ * State might be cleared and removed if it was not updated for the defined period of time.
+ */
+ private var minIdleStateRetentionTime: Long = Long.MinValue
+
+ /**
+ * The maximum time until state which was not updated will be retained.
+ * State will be cleared and removed if it was not updated for the defined period of time.
+ */
+ private var maxIdleStateRetentionTime: Long = Long.MinValue
+
+ /**
+ * Specifies the time interval for how long idle state, i.e., state which was not updated, will
+ * be retained. When state was not updated for the specified interval of time, it will be cleared
+ * and removed.
+ *
+ * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+ * was the first data. This can result in previous results being overwritten.
+ *
+ * Note: [[setIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
+ * maximum time for state to be retained. This method is more efficient, because the system has
+ * to do less bookkeeping to identify the time at which state must be cleared.
+ *
+ * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
+ * clean-up the state.
+ */
+ def setIdleStateRetentionTime(time: Time): StreamQueryConfig = {
+ setIdleStateRetentionTime(time, time)
+ }
+
+ /**
+ * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the minimum time and will never
+ * be kept if it was idle for more than the maximum time.
+ *
+ * When new data arrives for previously cleaned-up state, the new data will be handled as if it
+ * was the first data. This can result in previous results being overwritten.
+ *
+ * Set to 0 (zero) to never clean-up the state.
+ *
+ * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+ * never clean-up the state.
+ * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
+ * than than minTime. Set to 0 (zero) to never clean-up the state.
+ */
+ def setIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
+ if (maxTime.toMilliseconds < minTime.toMilliseconds) {
+ throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
+ }
+ minIdleStateRetentionTime = minTime.toMilliseconds
+ maxIdleStateRetentionTime = maxTime.toMilliseconds
+ this
+ }
+
+ def getMinIdleStateRetentionTime: Long = {
+ minIdleStateRetentionTime
+ }
+
+ def getMaxIdleStateRetentionTime: Long = {
+ maxIdleStateRetentionTime
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index aef2b1b..c594d4c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -81,6 +81,8 @@ abstract class StreamTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
+ def qConf: StreamQueryConfig = new StreamQueryConfig
+
/**
* Checks if the chosen table name is valid.
*
@@ -126,9 +128,20 @@ abstract class StreamTableEnvironment(
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
+ * @param qConfig The configuration for the query to generate.
* @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
*/
- override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+ override private[flink] def writeToSink[T](
+ table: Table,
+ sink: TableSink[T],
+ qConfig: QueryConfig): Unit = {
+
+ // Check query configuration
+ val sQConf = qConfig match {
+ case streamConfig: StreamQueryConfig => streamConfig
+ case _ =>
+ throw new TableException("StreamQueryConfig required to configure stream query.")
+ }
sink match {
@@ -137,7 +150,7 @@ abstract class StreamTableEnvironment(
val outputType = sink.getOutputType
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
- translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType)
+ translate(table, sQConf, updatesAsRetraction = true, withChangeFlag = true)(outputType)
// Give the DataStream to the TableSink to emit it.
retractSink.asInstanceOf[RetractStreamTableSink[Any]]
.emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -160,7 +173,11 @@ abstract class StreamTableEnvironment(
val outputType = sink.getOutputType
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
- translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType)
+ translate(
+ optimizedPlan,
+ table.getRelNode.getRowType,
+ sQConf,
+ withChangeFlag = true)(outputType)
// Give the DataStream to the TableSink to emit it.
upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
.emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
@@ -176,7 +193,11 @@ abstract class StreamTableEnvironment(
val outputType = sink.getOutputType
// translate the Table into a DataStream and provide the type that the TableSink expects.
val result: DataStream[T] =
- translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType)
+ translate(
+ optimizedPlan,
+ table.getRelNode.getRowType,
+ sQConf,
+ withChangeFlag = false)(outputType)
// Give the DataStream to the TableSink to emit it.
appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
@@ -545,17 +566,21 @@ abstract class StreamTableEnvironment(
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
*
* @param table The root node of the relational expression tree.
+ * @param qConfig The configuration for the query to generate.
* @param updatesAsRetraction Set to true to encode updates as retraction messages.
* @param withChangeFlag Set to true to emit records with change flags.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
- protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean)
- (implicit tpe: TypeInformation[A]): DataStream[A] = {
+ protected def translate[A](
+ table: Table,
+ qConfig: StreamQueryConfig,
+ updatesAsRetraction: Boolean,
+ withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)
- translate(dataStreamPlan, relNode.getRowType, withChangeFlag)
+ translate(dataStreamPlan, relNode.getRowType, qConfig, withChangeFlag)
}
/**
@@ -564,6 +589,7 @@ abstract class StreamTableEnvironment(
* @param logicalPlan The root node of the relational expression tree.
* @param logicalType The row type of the result. Since the logicalPlan can lose the
* field naming during optimization we pass the row type separately.
+ * @param qConfig The configuration for the query to generate.
* @param withChangeFlag Set to true to emit records with change flags.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
@@ -572,6 +598,7 @@ abstract class StreamTableEnvironment(
protected def translate[A](
logicalPlan: RelNode,
logicalType: RelDataType,
+ qConfig: StreamQueryConfig,
withChangeFlag: Boolean)
(implicit tpe: TypeInformation[A]): DataStream[A] = {
@@ -583,7 +610,7 @@ abstract class StreamTableEnvironment(
}
// get CRow plan
- val plan: DataStream[CRow] = translateToCRow(logicalPlan)
+ val plan: DataStream[CRow] = translateToCRow(logicalPlan, qConfig)
// convert CRow to output type
val conversion = if (withChangeFlag) {
@@ -615,14 +642,16 @@ abstract class StreamTableEnvironment(
* Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]].
*
* @param logicalPlan The logical plan to translate.
+ * @param qConfig The configuration for the query to generate.
* @return The [[DataStream]] of type [[CRow]].
*/
protected def translateToCRow(
- logicalPlan: RelNode): DataStream[CRow] = {
+ logicalPlan: RelNode,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
logicalPlan match {
case node: DataStreamRel =>
- node.translateToPlan(this)
+ node.translateToPlan(this, qConfig)
case _ =>
throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
@@ -638,7 +667,7 @@ abstract class StreamTableEnvironment(
def explain(table: Table): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast, updatesAsRetraction = false)
- val dataStream = translateToCRow(optimizedPlan)
+ val dataStream = translateToCRow(optimizedPlan, qConf)
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bf4a8e0..9f50f0c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -510,7 +510,7 @@ abstract class TableEnvironment(val config: TableConfig) {
* @param sink The [[TableSink]] to write the [[Table]] to.
* @tparam T The data type that the [[TableSink]] expects.
*/
- private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
+ private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
/**
* Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index a70bcca..c3b5951 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -150,9 +150,50 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+ toDataStream(table, clazz, qConf)
+ }
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+ toDataStream(table, typeInfo, qConf)
+ }
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the type of the resulting [[DataStream]].
+ * @param qConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toDataStream[T](table: Table, clazz: Class[T], qConfig: StreamQueryConfig): DataStream[T] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
TableEnvironment.validateType(typeInfo)
- translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+ translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
/**
@@ -168,12 +209,64 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @param qConfig The configuration of the query to generate.
* @tparam T The type of the resulting [[DataStream]].
* @return The converted [[DataStream]].
*/
- def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+ def toDataStream[T](
+ table: Table,
+ typeInfo: TypeInformation[T],
+ qConfig: StreamQueryConfig): DataStream[T] = {
TableEnvironment.validateType(typeInfo)
- translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+ translate[T](table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * The fields of the [[Table]] are mapped to the requested type as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the requested record type.
+ * @tparam T The type of the requested record type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T](
+ table: Table,
+ clazz: Class[T]): DataStream[JTuple2[JBool, T]] = {
+
+ toRetractStream(table, clazz, qConf)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * The fields of the [[Table]] are mapped to the requested type as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] of the requested record type.
+ * @tparam T The type of the requested record type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T](
+ table: Table,
+ typeInfo: TypeInformation[T]): DataStream[JTuple2[JBool, T]] = {
+
+ toRetractStream(table, typeInfo, qConf)
}
/**
@@ -190,17 +283,21 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param clazz The class of the requested record type.
+ * @param qConfig The configuration of the query to generate.
* @tparam T The type of the requested record type.
* @return The converted [[DataStream]].
*/
- def toRetractStream[T](table: Table, clazz: Class[T]):
- DataStream[JTuple2[JBool, T]] = {
+ def toRetractStream[T](
+ table: Table,
+ clazz: Class[T],
+ qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
TableEnvironment.validateType(typeInfo)
val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
translate[JTuple2[JBool, T]](
table,
+ qConfig,
updatesAsRetraction = true,
withChangeFlag = true)(resultType)
}
@@ -219,11 +316,14 @@ class StreamTableEnvironment(
*
* @param table The [[Table]] to convert.
* @param typeInfo The [[TypeInformation]] of the requested record type.
+ * @param qConfig The configuration of the query to generate.
* @tparam T The type of the requested record type.
* @return The converted [[DataStream]].
*/
- def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]):
- DataStream[JTuple2[JBool, T]] = {
+ def toRetractStream[T](
+ table: Table,
+ typeInfo: TypeInformation[T],
+ qConfig: StreamQueryConfig): DataStream[JTuple2[JBool, T]] = {
TableEnvironment.validateType(typeInfo)
val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
@@ -232,6 +332,7 @@ class StreamTableEnvironment(
)
translate[JTuple2[JBool, T]](
table,
+ qConfig,
updatesAsRetraction = true,
withChangeFlag = true)(resultTypeInfo)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index e5ad6c2..56f7d55 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.api.scala
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -143,8 +143,29 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+ toDataStream(table, qConf)
+ }
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param qConfig The configuration of the query to generate.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toDataStream[T: TypeInformation](table: Table, qConfig: StreamQueryConfig): DataStream[T] = {
val returnType = createTypeInformation[T]
- asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType))
+ asScalaStream(
+ translate(table, qConfig, updatesAsRetraction = false, withChangeFlag = false)(returnType))
}
/**
@@ -159,8 +180,27 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
+ toRetractStream(table, qConf)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * @param table The [[Table]] to convert.
+ * @param qConfig The configuration of the query to generate.
+ * @tparam T The type of the requested data type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T: TypeInformation](
+ table: Table,
+ qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
val returnType = createTypeInformation[(Boolean, T)]
- asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType))
+ asScalaStream(
+ translate(table, qConfig, updatesAsRetraction = true, withChangeFlag = true)(returnType))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 5efff62..966b42f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.table.api.{Table, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, Table, TableException}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
@@ -57,6 +57,21 @@ class TableConversions(table: Table) {
}
}
+ /** Converts the [[Table]] to a [[DataStream]] of the specified type.
+ *
+ * @param qConfig The configuration for the generated query.
+ */
+ def toDataStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[T] = {
+ table.tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toDataStream(table, qConfig)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
/** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
* the second field holds the record of the specified type [[T]].
@@ -76,5 +91,28 @@ class TableConversions(table: Table) {
}
}
+ /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * @param qConfig The configuration for the generated query.
+ *
+ */
+ def toRetractStream[T: TypeInformation](qConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
+
+ table.tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toRetractStream(table, qConfig)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 310a75f..5a2eb1c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -763,6 +763,30 @@ class Table(
* @tparam T The data type that the [[TableSink]] expects.
*/
def writeToSink[T](sink: TableSink[T]): Unit = {
+
+ def qConfig = this.tableEnv match {
+ case s: StreamTableEnvironment => s.qConf
+ case b: BatchTableEnvironment => new BatchQueryConfig
+ case _ => null
+ }
+
+ writeToSink(sink, qConfig)
+ }
+
+ /**
+ * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
+ *
+ * A batch [[Table]] can only be written to a
+ * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+ * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+ * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+ * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+ *
+ * @param sink The [[TableSink]] to which the [[Table]] is written.
+ * @param conf The configuration for the query that writes to the sink.
+ * @tparam T The data type that the [[TableSink]] expects.
+ */
+ def writeToSink[T](sink: TableSink[T], conf: QueryConfig): Unit = {
// get schema information of table
val rowType = getRelNode.getRowType
val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
@@ -773,7 +797,7 @@ class Table(
val configuredSink = sink.configure(fieldNames, fieldTypes)
// emit the table to the configured table sink
- tableEnv.writeToSink(this, configuredSink)
+ tableEnv.writeToSink(this, configuredSink, conf)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index ce0f966..0e377b5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexProgram
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.plan.nodes.CommonCalc
@@ -83,11 +83,13 @@ class DataStreamCalc(
estimateRowCount(calcProgram, rowCnt)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
- val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
val generator = new CodeGenerator(config, false, inputRowType)
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 19ad89b..cbd818a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
@@ -82,12 +82,14 @@ class DataStreamCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
// we do not need to specify input type
- val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 506c0cb..f01b24a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -21,9 +21,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -31,6 +32,7 @@ import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.slf4j.LoggerFactory
/**
*
@@ -59,6 +61,8 @@ class DataStreamGroupAggregate(
with CommonAggregate
with DataStreamRel {
+ private val LOG = LoggerFactory.getLogger(this.getClass)
+
override def deriveRowType() = schema.logicalType
override def needsUpdatesAsRetraction = true
@@ -100,9 +104,18 @@ class DataStreamGroupAggregate(
inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil))
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
+
+ if (qConfig.getMinIdleStateRetentionTime < 0 || qConfig.getMaxIdleStateRetentionTime < 0) {
+ LOG.warn(
+ "No state retention interval configured for a query which accumulates state. " +
+ "Please provide a query configuration with valid retention interval to prevent excessive " +
+ "state size. You may specify a retention time of 0 to not clean up the state.")
+ }
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
new CalcitePair[AggregateCall, String](
@@ -136,6 +149,7 @@ class DataStreamGroupAggregate(
inputSchema.logicalType,
inputSchema.physicalFieldTypeInfo,
groupings,
+ qConfig,
DataStreamRetractionRules.isAccRetract(this),
DataStreamRetractionRules.isAccRetract(getInput))
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index ef207b0..d2aaad0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.expressions.ExpressionUtils._
@@ -107,9 +107,11 @@ class DataStreamGroupWindowAggregate(
namedProperties))
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
val physicalNamedAggregates = namedAggregates.map { namedAggregate =>
new CalcitePair[AggregateCall, String](
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 4061242..8e97884 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.{AggregateCall, Window}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.OverAggregate
import org.apache.flink.table.plan.schema.RowSchema
@@ -88,7 +88,10 @@ class DataStreamOverAggregate(
namedAggregates))
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
+
if (logicWindow.groups.size > 1) {
throw new TableException(
"Unsupported use of OVER windows. All aggregates must be computed on the same window.")
@@ -109,7 +112,7 @@ class DataStreamOverAggregate(
"Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
}
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 9754de4..6f6edf7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.table.runtime.types.CRow
@@ -29,9 +29,12 @@ trait DataStreamRel extends FlinkRelNode {
* Translates the FlinkRelNode into a Flink operator.
*
* @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
+ * @param qConfig The configuration for the query to generate.
* @return DataStream of type [[CRow]]
*/
- def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
+ def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow]
/**
* Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c613646..e64bf0f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.schema.DataStreamTable
import org.apache.flink.table.runtime.types.CRow
@@ -54,7 +54,10 @@ class DataStreamScan(
)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
+
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
convertToInternalRow(schema, inputDataStream, dataStreamTable, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index 654c259..6cc7396 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.types.CRow
@@ -58,10 +58,12 @@ class DataStreamUnion(
s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
- val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
+ val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, qConfig)
leftDataSet.union(rightDataSet)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 32c9aaf..ba6b025 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.io.CRowValuesInputFormat
@@ -56,7 +56,9 @@ class DataStreamValues(
)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
val config = tableEnv.getConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index b2d7019..225f23f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.RowSchema
@@ -98,7 +98,10 @@ class StreamTableSourceScan(
)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ qConfig: StreamQueryConfig): DataStream[CRow] = {
+
val config = tableEnv.getConfig
val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
convertToInternalRow(
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 768c9cb..27392c7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{StreamQueryConfig, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
@@ -155,6 +155,7 @@ object AggregateUtil {
inputRowType: RelDataType,
inputFieldTypes: Seq[TypeInformation[_]],
groupings: Array[Int],
+ qConfig: StreamQueryConfig,
generateRetraction: Boolean,
consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
@@ -190,7 +191,8 @@ object AggregateUtil {
new GroupAggProcessFunction(
genFunction,
aggregationStateType,
- generateRetraction)
+ generateRetraction,
+ qConfig)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 6ee37e6..84fee87 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -26,9 +26,9 @@ import org.apache.flink.util.Collector
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.slf4j.LoggerFactory
+import org.slf4j.{Logger, LoggerFactory}
import org.apache.flink.table.runtime.types.CRow
/**
@@ -40,13 +40,20 @@ import org.apache.flink.table.runtime.types.CRow
class GroupAggProcessFunction(
private val genAggregations: GeneratedAggregationsFunction,
private val aggregationStateType: RowTypeInfo,
- private val generateRetraction: Boolean)
+ private val generateRetraction: Boolean,
+ private val qConfig: StreamQueryConfig)
extends ProcessFunction[CRow, CRow]
with Compiler[GeneratedAggregations] {
- val LOG = LoggerFactory.getLogger(this.getClass)
+ val LOG: Logger = LoggerFactory.getLogger(this.getClass)
private var function: GeneratedAggregations = _
+ private val minRetentionTime = qConfig.getMinIdleStateRetentionTime
+ private val maxRetentionTime = qConfig.getMaxIdleStateRetentionTime
+ private val stateCleaningEnabled = minRetentionTime > 1 && maxRetentionTime > 1
+ // interval in which clean-up timers are registered
+ private val cleanupTimerInterval = maxRetentionTime - minRetentionTime
+
private var newRow: CRow = _
private var prevRow: CRow = _
private var firstRow: Boolean = _
@@ -54,6 +61,8 @@ class GroupAggProcessFunction(
private var state: ValueState[Row] = _
// counts the number of added and retracted input records
private var cntState: ValueState[JLong] = _
+ // holds the latest registered cleanup timer
+ private var cleanupTimeState: ValueState[JLong] = _
override def open(config: Configuration) {
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -74,6 +83,12 @@ class GroupAggProcessFunction(
val inputCntDescriptor: ValueStateDescriptor[JLong] =
new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
cntState = getRuntimeContext.getState(inputCntDescriptor)
+
+ if (stateCleaningEnabled) {
+ val inputCntDescriptor: ValueStateDescriptor[JLong] =
+ new ValueStateDescriptor[JLong]("GroupAggregateCleanupTime", Types.LONG)
+ cleanupTimeState = getRuntimeContext.getState(inputCntDescriptor)
+ }
}
override def processElement(
@@ -81,6 +96,23 @@ class GroupAggProcessFunction(
ctx: ProcessFunction[CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
+ if (stateCleaningEnabled) {
+
+ val currentTime = ctx.timerService().currentProcessingTime()
+ val earliestCleanup = currentTime + minRetentionTime
+
+ // last registered timer
+ val lastCleanupTime = cleanupTimeState.value()
+
+ if (lastCleanupTime == null || earliestCleanup >= lastCleanupTime + cleanupTimerInterval) {
+ // we need to register a new timer
+ val cleanupTime = earliestCleanup + cleanupTimerInterval
+ // register timer and remember clean-up time
+ ctx.timerService().registerProcessingTimeTimer(cleanupTime)
+ cleanupTimeState.update(cleanupTime)
+ }
+ }
+
val input = inputC.row
// get accumulators and input counter
@@ -144,4 +176,18 @@ class GroupAggProcessFunction(
cntState.clear()
}
}
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (timestamp == cleanupTimeState.value()) {
+ // clear all state
+ this.state.clear()
+ this.cntState.clear()
+ this.cleanupTimeState.clear()
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/003f81a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8626b07..3d79e22 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -18,16 +18,17 @@
package org.apache.flink.table.utils
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.tools.RuleSet
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
+import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
class MockTableEnvironment extends TableEnvironment(new TableConfig) {
- override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+ override private[flink] def writeToSink[T](
+ table: Table,
+ sink: TableSink[T],
+ qConfig: QueryConfig): Unit = ???
override protected def checkValidTableName(name: String): Unit = ???