You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:49 UTC
[15/15] flink git commit: [FLINK-6093] [table] Add stream TableSinks
and DataStream conversion with support for retraction.
[FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f37988c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f37988c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f37988c1
Branch: refs/heads/master
Commit: f37988c19adc30d324cde83c54f2fa5d36efb9e7
Parents: bfed279
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Apr 28 01:59:57 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:55 2017 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaTableSink.java | 6 +-
.../flink/table/api/BatchTableEnvironment.scala | 16 +-
.../table/api/StreamTableEnvironment.scala | 428 +++++++++++-----
.../apache/flink/table/api/TableConfig.scala | 17 -
.../flink/table/api/TableEnvironment.scala | 48 +-
.../table/api/java/StreamTableEnvironment.scala | 81 ++-
.../api/scala/StreamTableEnvironment.scala | 33 +-
.../table/api/scala/TableConversions.scala | 19 +
.../org/apache/flink/table/api/table.scala | 4 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 8 +-
.../datastream/DataStreamGroupAggregate.scala | 2 +
.../DataStreamGroupWindowAggregate.scala | 4 +
.../nodes/datastream/retractionTraits.scala | 37 +-
.../datastream/DataStreamRetractionRules.scala | 16 +-
.../runtime/CRowCorrelateFlatMapRunner.scala | 2 +-
.../flink/table/runtime/CRowFlatMapRunner.scala | 2 +-
.../table/runtime/CRowInputMapRunner.scala | 2 +-
.../runtime/CRowInputTupleOutputMapRunner.scala | 53 +-
.../table/runtime/CRowOutputMapRunner.scala | 2 +-
.../table/runtime/CorrelateFlatMapRunner.scala | 2 +-
.../flink/table/runtime/FlatJoinRunner.scala | 2 +-
.../flink/table/runtime/FlatMapRunner.scala | 2 +-
.../flink/table/runtime/MapJoinLeftRunner.scala | 2 +-
.../table/runtime/MapJoinRightRunner.scala | 2 +-
.../apache/flink/table/runtime/MapRunner.scala | 2 +-
.../flink/table/runtime/MapSideJoinRunner.scala | 2 +-
...aSetSessionWindowAggregatePreProcessor.scala | 2 +-
.../aggregate/GroupAggProcessFunction.scala | 55 +-
.../runtime/io/CRowValuesInputFormat.scala | 2 +-
.../table/runtime/io/ValuesInputFormat.scala | 2 +-
.../table/sinks/AppendStreamTableSink.scala | 36 ++
.../apache/flink/table/sinks/CsvTableSink.scala | 100 +---
.../table/sinks/RetractStreamTableSink.scala | 55 ++
.../flink/table/sinks/StreamRetractSink.scala | 35 --
.../flink/table/sinks/StreamTableSink.scala | 32 --
.../table/sinks/UpsertStreamTableSink.scala | 79 +++
.../flink/table/TableEnvironmentTest.scala | 1 -
.../api/scala/stream/RetractionITCase.scala | 80 ++-
.../api/scala/stream/TableSinkITCase.scala | 33 +-
.../table/api/scala/stream/sql/SqlITCase.scala | 102 ++--
.../stream/table/GroupAggregationsITCase.scala | 61 +--
.../api/scala/stream/table/OverWindowTest.scala | 18 +-
.../api/scala/stream/utils/StreamITCase.scala | 35 +-
.../table/plan/rules/RetractionRulesTest.scala | 2 +-
.../table/sinks/StreamTableSinksITCase.scala | 511 +++++++++++++++++++
.../table/utils/MockTableEnvironment.scala | 5 -
.../flink/table/utils/TableTestBase.scala | 4 +-
47 files changed, 1433 insertions(+), 611 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 97f5fba..a8a2fd0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -29,12 +29,12 @@ import org.apache.flink.util.Preconditions;
import java.util.Properties;
/**
- * A version-agnostic Kafka {@link StreamTableSink}.
+ * A version-agnostic Kafka {@link AppendStreamTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
*/
-public abstract class KafkaTableSink implements StreamTableSink<Row> {
+public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
protected final String topic;
protected final Properties properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c7bacfe..2a3cedf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAtt
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable}
import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -133,14 +133,14 @@ abstract class BatchTableEnvironment(
* Creates a final converter that maps the internal row type to external type.
*
* @param physicalTypeInfo the input of the sink
- * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param schema the input schema with correct field names (esp. for POJO field mapping)
* @param requestedTypeInfo the output type of the sink
* @param functionName name of the map function. Must not be unique but has to be a
* valid Java class identifier.
*/
- override protected def getConversionMapper[IN, OUT](
+ protected def getConversionMapper[IN, OUT](
physicalTypeInfo: TypeInformation[IN],
- logicalRowType: RelDataType,
+ schema: RowSchema,
requestedTypeInfo: TypeInformation[OUT],
functionName: String):
Option[MapFunction[IN, OUT]] = {
@@ -153,7 +153,7 @@ abstract class BatchTableEnvironment(
val converterFunction = generateRowConverterFunction[OUT](
physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
- logicalRowType,
+ schema,
requestedTypeInfo,
functionName
)
@@ -334,7 +334,11 @@ abstract class BatchTableEnvironment(
case node: DataSetRel =>
val plan = node.translateToPlan(this)
val conversion =
- getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+ getConversionMapper(
+ plan.getType,
+ new RowSchema(logicalType),
+ tpe,
+ "DataSetSinkConversion")
conversion match {
case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
case Some(mapFunction: MapFunction[Row, A]) =>
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index bd06305..aef2b1b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -19,19 +19,23 @@
package org.apache.flink.table.api
import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.lang.{Boolean => JBool}
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelNode, RelVisitor}
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.typeinfo.AtomicType
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
@@ -39,12 +43,12 @@ import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
+import org.apache.flink.table.plan.nodes.datastream._
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.StreamTableSourceTable
-import org.apache.flink.table.plan.schema.DataStreamTable
-import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner}
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.types.Row
@@ -127,97 +131,192 @@ abstract class StreamTableEnvironment(
override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
sink match {
- case streamSink: StreamTableSink[T] =>
+
+ case retractSink: RetractStreamTableSink[_] =>
+ // retraction sink can always be used
val outputType = sink.getOutputType
// translate the Table into a DataStream and provide the type that the TableSink expects.
- val result: DataStream[T] = translate(table)(outputType)
- // Give the DataSet to the TableSink to emit it.
- streamSink.emitDataStream(result)
-
- case streamRetractSink: StreamRetractSink[T] =>
+ val result: DataStream[T] =
+ translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType)
+ // Give the DataStream to the TableSink to emit it.
+ retractSink.asInstanceOf[RetractStreamTableSink[Any]]
+ .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
+
+ case upsertSink: UpsertStreamTableSink[_] =>
+ // optimize plan
+ val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
+ // check for append only table
+ val isAppendOnlyTable = isAppendOnly(optimizedPlan)
+ upsertSink.setIsAppendOnly(isAppendOnlyTable)
+ // extract unique key fields
+ val tableKeys: Option[Array[String]] = getUniqueKeyFields(optimizedPlan)
+ // check that we have keys if the table has changes (is not append-only)
+ tableKeys match {
+ case Some(keys) => upsertSink.setKeyFields(keys)
+ case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
+ case None if !isAppendOnlyTable => throw new TableException(
+ "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.")
+ }
+ val outputType = sink.getOutputType
+ // translate the Table into a DataStream and provide the type that the TableSink expects.
+ val result: DataStream[T] =
+ translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType)
+ // Give the DataStream to the TableSink to emit it.
+ upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
+ .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
+
+ case appendSink: AppendStreamTableSink[_] =>
+ // optimize plan
+ val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
+ // verify table is an insert-only (append-only) table
+ if (!isAppendOnly(optimizedPlan)) {
+ throw new TableException(
+ "AppendStreamTableSink requires that Table has only insert changes.")
+ }
val outputType = sink.getOutputType
- this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction)
// translate the Table into a DataStream and provide the type that the TableSink expects.
- val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType)
- // Give the DataSet to the TableSink to emit it.
- streamRetractSink.emitDataStreamWithChange(result)
+ val result: DataStream[T] =
+ translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType)
+ // Give the DataStream to the TableSink to emit it.
+ appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
+
case _ =>
- throw new TableException("StreamTableSink required to emit streaming Table")
+ throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, " +
+ "RetractStreamTableSink, or UpsertStreamTableSink.")
}
}
-
/**
* Creates a final converter that maps the internal row type to external type.
*
* @param physicalTypeInfo the input of the sink
- * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param schema the input schema with correct field names (esp. for POJO field mapping)
* @param requestedTypeInfo the output type of the sink
* @param functionName name of the map function. Must not be unique but has to be a
* valid Java class identifier.
*/
- override protected def getConversionMapper[IN, OUT](
+ protected def getConversionMapper[IN, OUT](
physicalTypeInfo: TypeInformation[IN],
- logicalRowType: RelDataType,
+ schema: RowSchema,
requestedTypeInfo: TypeInformation[OUT],
functionName: String):
- Option[MapFunction[IN, OUT]] = {
+ MapFunction[IN, OUT] = {
- if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
- // only used to explain table
- None
- } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+ if (requestedTypeInfo.getTypeClass == classOf[Row]) {
// CRow to Row, only needs to be unwrapped
- Some(
- new MapFunction[CRow, Row] {
- override def map(value: CRow): Row = value.row
- }.asInstanceOf[MapFunction[IN, OUT]]
- )
+ new MapFunction[CRow, Row] {
+ override def map(value: CRow): Row = value.row
+ }.asInstanceOf[MapFunction[IN, OUT]]
} else {
// Some type that is neither CRow nor Row
val converterFunction = generateRowConverterFunction[OUT](
physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
- logicalRowType,
+ schema,
requestedTypeInfo,
functionName
)
- Some(new CRowInputMapRunner[OUT](
+ new CRowInputMapRunner[OUT](
converterFunction.name,
converterFunction.code,
converterFunction.returnType)
- .asInstanceOf[MapFunction[IN, OUT]])
+ .asInstanceOf[MapFunction[IN, OUT]]
}
}
+ /** Validates that the plan produces only append changes. */
+ protected def isAppendOnly(plan: RelNode): Boolean = {
+ val appendOnlyValidator = new AppendOnlyValidator
+ appendOnlyValidator.go(plan)
+
+ appendOnlyValidator.isAppendOnly
+ }
+
+ /** Extracts the unique keys of the table produced by the plan. */
+ protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+ val keyExtractor = new UniqueKeyExtractor
+ keyExtractor.go(plan)
+ keyExtractor.keys
+ }
+
/**
- * Creates a final converter that maps the internal CRow type to external Tuple2 type.
+ * Creates a converter that maps the internal CRow type to Scala or Java Tuple2 with change flag.
*
* @param physicalTypeInfo the input of the sink
- * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
- * @param requestedTypeInfo the output type of the sink
+ * @param schema the input schema with correct field names (esp. for POJO field mapping)
+ * @param requestedTypeInfo the output type of the sink.
* @param functionName name of the map function. Must not be unique but has to be a
* valid Java class identifier.
*/
- protected def getTupleConversionMapper[IN, OUT](
- physicalTypeInfo: TypeInformation[IN],
- logicalRowType: RelDataType,
- requestedTypeInfo: TypeInformation[OUT],
- functionName: String):
- Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = {
+ private def getConversionMapperWithChanges[OUT](
+ physicalTypeInfo: TypeInformation[CRow],
+ schema: RowSchema,
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String):
+ MapFunction[CRow, OUT] = {
+
+ requestedTypeInfo match {
+
+ // Scala tuple
+ case t: CaseClassTypeInfo[_]
+ if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN =>
+
+ val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]]
+ if (reqType.getTypeClass == classOf[Row]) {
+ // Requested type is Row. Just rewrap CRow in Tuple2
+ new MapFunction[CRow, (Boolean, Row)] {
+ override def map(cRow: CRow): (Boolean, Row) = {
+ (cRow.change, cRow.row)
+ }
+ }.asInstanceOf[MapFunction[CRow, OUT]]
+ } else {
+ // Use a map function to convert Row into requested type and wrap result in Tuple2
+ val converterFunction = generateRowConverterFunction(
+ physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+ schema,
+ reqType,
+ functionName
+ )
+
+ new CRowInputScalaTupleOutputMapRunner(
+ converterFunction.name,
+ converterFunction.code,
+ requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]])
+ .asInstanceOf[MapFunction[CRow, OUT]]
- val converterFunction = generateRowConverterFunction(
- physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
- logicalRowType,
- requestedTypeInfo,
- functionName
- )
+ }
- Some(new CRowInputTupleOutputMapRunner[OUT](
- converterFunction.name,
- converterFunction.code,
- new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo))
- .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]])
+ // Java tuple
+ case t: TupleTypeInfo[_]
+ if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN =>
+
+ val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]]
+ if (reqType.getTypeClass == classOf[Row]) {
+ // Requested type is Row. Just rewrap CRow in Tuple2
+ new MapFunction[CRow, JTuple2[JBool, Row]] {
+ val outT = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row])
+ override def map(cRow: CRow): JTuple2[JBool, Row] = {
+ outT.f0 = cRow.change
+ outT.f1 = cRow.row
+ outT
+ }
+ }.asInstanceOf[MapFunction[CRow, OUT]]
+ } else {
+ // Use a map function to convert Row into requested type and wrap result in Tuple2
+ val converterFunction = generateRowConverterFunction(
+ physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+ schema,
+ reqType,
+ functionName
+ )
+
+ new CRowInputJavaTupleOutputMapRunner(
+ converterFunction.name,
+ converterFunction.code,
+ requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, Any]]])
+ .asInstanceOf[MapFunction[CRow, OUT]]
+ }
+ }
}
/**
@@ -380,9 +479,10 @@ abstract class StreamTableEnvironment(
* Generates the optimized [[RelNode]] tree from the original relational node tree.
*
* @param relNode The root node of the relational expression tree.
+ * @param updatesAsRetraction True if the sink requests updates as retraction messages.
* @return The optimized [[RelNode]] tree
*/
- private[flink] def optimize(relNode: RelNode): RelNode = {
+ private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {
// 1. decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
@@ -410,7 +510,7 @@ abstract class StreamTableEnvironment(
// 5. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
- var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
+ val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
} else {
logicalPlan
@@ -419,13 +519,18 @@ abstract class StreamTableEnvironment(
// 6. decorate the optimized plan
val decoRuleSet = getDecoRuleSet
val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
-
- if (this.config.getNeedsUpdatesAsRetractionForSink) {
- physicalPlan = physicalPlan.copy(
+ val planToDecorate = if (updatesAsRetraction) {
+ physicalPlan.copy(
physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
physicalPlan.getInputs)
+ } else {
+ physicalPlan
}
- runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
+ runHepPlanner(
+ HepMatchOrder.BOTTOM_UP,
+ decoRuleSet,
+ planToDecorate,
+ planToDecorate.getTraitSet)
} else {
physicalPlan
}
@@ -440,14 +545,17 @@ abstract class StreamTableEnvironment(
* Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
*
* @param table The root node of the relational expression tree.
+ * @param updatesAsRetraction Set to true to encode updates as retraction messages.
+ * @param withChangeFlag Set to true to emit records with change flags.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
- protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+ protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean)
+ (implicit tpe: TypeInformation[A]): DataStream[A] = {
val relNode = table.getRelNode
- val dataStreamPlan = optimize(relNode)
- translate(dataStreamPlan, relNode.getRowType)
+ val dataStreamPlan = optimize(relNode, updatesAsRetraction)
+ translate(dataStreamPlan, relNode.getRowType, withChangeFlag)
}
/**
@@ -456,87 +564,65 @@ abstract class StreamTableEnvironment(
* @param logicalPlan The root node of the relational expression tree.
* @param logicalType The row type of the result. Since the logicalPlan can lose the
* field naming during optimization we pass the row type separately.
+ * @param withChangeFlag Set to true to emit records with change flags.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A](
logicalPlan: RelNode,
- logicalType: RelDataType)
+ logicalType: RelDataType,
+ withChangeFlag: Boolean)
(implicit tpe: TypeInformation[A]): DataStream[A] = {
- TableEnvironment.validateType(tpe)
+ // if no change flags are requested, verify table is an insert-only (append-only) table.
+ if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
+ throw new TableException(
+ "Table is not an append-only table. " +
+ "Output needs to handle update and delete changes.")
+ }
- logicalPlan match {
- case node: DataStreamRel =>
- val plan = node.translateToPlan(this)
- val conversion =
- getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
- conversion match {
- case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
- case Some(mapFunction: MapFunction[CRow, A]) =>
- plan.map(mapFunction)
- .returns(tpe)
- .name(s"to: ${tpe.getTypeClass.getSimpleName}")
- .asInstanceOf[DataStream[A]]
- }
+ // get CRow plan
+ val plan: DataStream[CRow] = translateToCRow(logicalPlan)
- case _ =>
- throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
- "This is a bug and should not happen. Please file an issue.")
+ // convert CRow to output type
+ val conversion = if (withChangeFlag) {
+ getConversionMapperWithChanges(
+ plan.getType,
+ new RowSchema(logicalType),
+ tpe,
+ "DataStreamSinkConversion")
+ } else {
+ getConversionMapper(
+ plan.getType,
+ new RowSchema(logicalType),
+ tpe,
+ "DataStreamSinkConversion")
}
- }
- /**
- * Translates a [[Table]] into a [[DataStream]] with change information.
- *
- * The transformation involves optimizing the relational expression tree as defined by
- * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
- *
- * @param table The root node of the relational expression tree.
- * @param wrapToTuple True, if want to output chang information
- * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
- * @tparam A The type of the resulting [[DataStream]].
- * @return The [[DataStream]] that corresponds to the translated [[Table]].
- */
- protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A])
- : DataStream[JTuple2[Boolean, A]] = {
- val relNode = table.getRelNode
- val dataStreamPlan = optimize(relNode)
- translate(dataStreamPlan, relNode.getRowType, wrapToTuple)
+ val rootParallelism = plan.getParallelism
+
+ conversion match {
+ case mapFunction: MapFunction[CRow, A] =>
+ plan.map(mapFunction)
+ .returns(tpe)
+ .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+ .setParallelism(rootParallelism)
+ }
}
/**
- * Translates a logical [[RelNode]] into a [[DataStream]] with change information.
+ * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]].
*
- * @param logicalPlan The root node of the relational expression tree.
- * @param logicalType The row type of the result. Since the logicalPlan can lose the
- * @param wrapToTuple True, if want to output chang information
- * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
- * @tparam A The type of the resulting [[DataStream]].
- * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ * @param logicalPlan The logical plan to translate.
+ * @return The [[DataStream]] of type [[CRow]].
*/
- protected def translate[A](
- logicalPlan: RelNode,
- logicalType: RelDataType,
- wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = {
-
- TableEnvironment.validateType(tpe)
+ protected def translateToCRow(
+ logicalPlan: RelNode): DataStream[CRow] = {
logicalPlan match {
case node: DataStreamRel =>
- val plan = node.translateToPlan(this)
- val conversion =
- getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
- conversion match {
- case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary
- case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) =>
- plan.map(mapFunction)
- .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe))
- .name(s"to: ${tpe.getTypeClass.getSimpleName}")
- .asInstanceOf[DataStream[JTuple2[Boolean, A]]]
- }
-
+ node.translateToPlan(this)
case _ =>
throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
@@ -551,10 +637,8 @@ abstract class StreamTableEnvironment(
*/
def explain(table: Table): String = {
val ast = table.getRelNode
- val optimizedPlan = optimize(ast)
- val dataStream = translate[CRow](
- optimizedPlan,
- ast.getRowType)(new GenericTypeInfo(classOf[CRow]))
+ val optimizedPlan = optimize(ast, updatesAsRetraction = false)
+ val dataStream = translateToCRow(optimizedPlan)
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
@@ -574,4 +658,90 @@ abstract class StreamTableEnvironment(
s"$sqlPlan"
}
+ private class AppendOnlyValidator extends RelVisitor {
+
+ var isAppendOnly = true
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case s: DataStreamRel if s.producesUpdates =>
+ isAppendOnly = false
+ case _ =>
+ super.visit(node, ordinal, parent)
+ }
+ }
+ }
+
+ /** Identifies unique key fields in the output of a RelNode. */
+ private class UniqueKeyExtractor extends RelVisitor {
+
+ var keys: Option[Array[String]] = None
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case c: DataStreamCalc =>
+ super.visit(node, ordinal, parent)
+ // check if input has keys
+ if (keys.isDefined) {
+ // track keys forward
+ val inNames = c.getInput.getRowType.getFieldNames
+ val inOutNames = c.getProgram.getNamedProjects.asScala
+ .map(p => {
+ c.getProgram.expandLocalRef(p.left) match {
+ // output field is forwarded input field
+ case i: RexInputRef => (i.getIndex, p.right)
+ // output field is renamed input field
+ case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+ a.getOperands.get(0) match {
+ case ref: RexInputRef =>
+ (ref.getIndex, p.right)
+ case _ =>
+ (-1, p.right)
+ }
+ // output field is not forwarded from input
+ case _: RexNode => (-1, p.right)
+ }
+ })
+ // filter all non-forwarded fields
+ .filter(_._1 >= 0)
+ // resolve names of input fields
+ .map(io => (inNames.get(io._1), io._2))
+
+ // filter by input keys
+ val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+ // check if all keys have been preserved
+ if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+ // all key have been preserved (but possibly renamed)
+ keys = Some(outKeys.toArray)
+ } else {
+ // some (or all) keys have been removed. Keys are no longer unique and removed
+ keys = None
+ }
+ }
+ case _: DataStreamOverAggregate =>
+ super.visit(node, ordinal, parent)
+ // keys are always forwarded by Over aggregate
+ case a: DataStreamGroupAggregate =>
+ // get grouping keys
+ val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+ keys = Some(groupKeys.toArray)
+ case w: DataStreamGroupWindowAggregate =>
+ // get grouping keys
+ val groupKeys =
+ w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+ // get window start and end time
+ val windowStartEnd = w.getWindowProperties.map(_.name)
+ // we have only a unique key if at least one window property is selected
+ if (windowStartEnd.nonEmpty) {
+ keys = Some(groupKeys ++ windowStartEnd)
+ }
+ case _: DataStreamRel =>
+ // anything else does not forward keys or might duplicate key, so we can stop
+ keys = None
+ }
+ }
+
+ }
+
}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index d296978..6448657 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,11 +37,6 @@ class TableConfig {
private var nullCheck: Boolean = true
/**
- * Defines whether sink table requires that update and delete changes are sent with retraction
- */
- private var needsUpdatesAsRetractionForSink: Boolean = false
-
- /**
* Defines the configuration of Calcite for Table API and SQL queries.
*/
private var calciteConfig = CalciteConfig.DEFAULT
@@ -72,18 +67,6 @@ class TableConfig {
}
/**
- * Returns the need retraction property for table sink.
- */
- def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink
-
- /**
- * Set the need retraction property for table sink.
- */
- def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = {
- this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction
- }
-
- /**
* Returns the current configuration of Calcite for Table API and SQL queries.
*/
def getCalciteConfig: CalciteConfig = calciteConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 5b752ab..bb0de3e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -51,15 +51,14 @@ import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction}
+import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.RelTable
-import org.apache.flink.table.runtime.types.CRowTypeInfo
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.validate.FunctionCatalog
@@ -620,7 +619,7 @@ abstract class TableEnvironment(val config: TableConfig) {
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
"Please specify the type of the input with a RowTypeInfo.")
- case a: AtomicType[A] =>
+ case a: AtomicType[_] =>
exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
if (idx > 0) {
@@ -691,53 +690,32 @@ abstract class TableEnvironment(val config: TableConfig) {
(fieldNames.toArray, fieldIndexes.toArray)
}
- /**
- * Creates a final converter that maps the internal row type to external type.
- *
- * @param physicalTypeInfo the input of the sink
- * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
- * @param requestedTypeInfo the output type of the sink
- * @param functionName name of the map function. Must not be unique but has to be a
- * valid Java class identifier.
- */
- protected def getConversionMapper[IN, OUT](
- physicalTypeInfo: TypeInformation[IN],
- logicalRowType: RelDataType,
- requestedTypeInfo: TypeInformation[OUT],
- functionName: String):
- Option[MapFunction[IN, OUT]]
-
protected def generateRowConverterFunction[OUT](
inputTypeInfo: TypeInformation[Row],
- logicalRowType: RelDataType,
+ schema: RowSchema,
requestedTypeInfo: TypeInformation[OUT],
functionName: String):
GeneratedFunction[MapFunction[Row, OUT], OUT] = {
// validate that at least the field types of physical and logical type match
// we do that here to make sure that plan translation was correct
- val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
- if (logicalRowTypeInfo != inputTypeInfo) {
+ if (schema.physicalTypeInfo != inputTypeInfo) {
throw TableException("The field types of physical and logical row types do not match." +
"This is a bug and should not happen. Please file an issue.")
}
- // convert to type information
- val logicalFieldTypes = logicalRowType.getFieldList.asScala
- .map(t => FlinkTypeFactory.toTypeInfo(t.getType))
-
- // field names
- val logicalFieldNames = logicalRowType.getFieldNames.asScala
+ val fieldTypes = schema.physicalFieldTypeInfo
+ val fieldNames = schema.physicalFieldNames
// validate requested type
- if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
+ if (requestedTypeInfo.getArity != fieldTypes.length) {
throw new TableException("Arity of result does not match requested type.")
}
requestedTypeInfo match {
// POJO type requested
case pt: PojoTypeInfo[_] =>
- logicalFieldNames.zip(logicalFieldTypes) foreach {
+ fieldNames.zip(fieldTypes) foreach {
case (fName, fType) =>
val pojoIdx = pt.getFieldIndex(fName)
if (pojoIdx < 0) {
@@ -752,7 +730,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// Tuple/Case class/Row type requested
case tt: TupleTypeInfoBase[_] =>
- logicalFieldTypes.zipWithIndex foreach {
+ fieldTypes.zipWithIndex foreach {
case (fieldTypeInfo, i) =>
val requestedTypeInfo = tt.getTypeAt(i)
if (fieldTypeInfo != requestedTypeInfo) {
@@ -763,11 +741,11 @@ abstract class TableEnvironment(val config: TableConfig) {
// Atomic type requested
case at: AtomicType[_] =>
- if (logicalFieldTypes.size != 1) {
+ if (fieldTypes.size != 1) {
throw new TableException(s"Requested result type is an atomic type but " +
s"result has more or less than a single field.")
}
- val fieldTypeInfo = logicalFieldTypes.head
+ val fieldTypeInfo = fieldTypes.head
if (fieldTypeInfo != at) {
throw new TableException(s"Result field does not match requested type. " +
s"Requested: $at; Actual: $fieldTypeInfo")
@@ -787,7 +765,7 @@ abstract class TableEnvironment(val config: TableConfig) {
val conversion = generator.generateConverterResultExpression(
requestedTypeInfo,
- logicalFieldNames)
+ fieldNames)
val body =
s"""
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index a649584..a70bcca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -18,12 +18,14 @@
package org.apache.flink.table.api.java
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.table.api._
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import _root_.java.lang.{Boolean => JBool}
/**
* The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
@@ -132,7 +134,10 @@ class StreamTableEnvironment(
}
/**
- * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
@@ -145,11 +150,16 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
- translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+ val typeInfo = TypeExtractor.createTypeInfo(clazz)
+ TableEnvironment.validateType(typeInfo)
+ translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
/**
- * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
* - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
@@ -162,7 +172,68 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
- translate[T](table)(typeInfo)
+ TableEnvironment.validateType(typeInfo)
+ translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * The fields of the [[Table]] are mapped to the requested type as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the requested record type.
+ * @tparam T The type of the requested record type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T](table: Table, clazz: Class[T]):
+ DataStream[JTuple2[JBool, T]] = {
+
+ val typeInfo = TypeExtractor.createTypeInfo(clazz)
+ TableEnvironment.validateType(typeInfo)
+ val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
+ translate[JTuple2[JBool, T]](
+ table,
+ updatesAsRetraction = true,
+ withChangeFlag = true)(resultType)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * The fields of the [[Table]] are mapped to the requested type as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] of the requested record type.
+ * @tparam T The type of the requested record type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]):
+ DataStream[JTuple2[JBool, T]] = {
+
+ TableEnvironment.validateType(typeInfo)
+ val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
+ Types.BOOLEAN,
+ typeInfo
+ )
+ translate[JTuple2[JBool, T]](
+ table,
+ updatesAsRetraction = true,
+ withChangeFlag = true)(resultTypeInfo)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 0552d7c..e5ad6c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,10 +17,11 @@
*/
package org.apache.flink.table.api.scala
+import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableEnvironment, Table, TableConfig}
-import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.asScalaStream
@@ -127,11 +128,14 @@ class StreamTableEnvironment(
}
/**
- * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+ * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+ * by update or delete changes, the conversion will fail.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
- * types: Fields are mapped by position, field types must match.
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+ * types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
* @param table The [[Table]] to convert.
@@ -139,7 +143,24 @@ class StreamTableEnvironment(
* @return The converted [[DataStream]].
*/
def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
- asScalaStream(translate(table))
+ val returnType = createTypeInformation[T]
+ asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType))
+ }
+
+/**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the requested data type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
+ val returnType = createTypeInformation[(Boolean, T)]
+ asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType))
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 2a0d571..5efff62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -57,5 +57,24 @@ class TableConversions(table: Table) {
}
}
+ /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+ *
+ */
+ def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
+
+ table.tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toRetractStream(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index dd8265b..310a75f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -755,7 +755,9 @@ class Table(
*
* A batch [[Table]] can only be written to a
* [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
- * [[org.apache.flink.table.sinks.StreamTableSink]].
+ * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+ * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+ * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
*
* @param sink The [[TableSink]] to which the [[Table]] is written.
* @tparam T The data type that the [[TableSink]] expects.
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 59f723ac..ce0f966 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -100,11 +100,17 @@ class DataStreamCalc(
calcProgram,
config)
+ val inputParallelism = inputDataStream.getParallelism
+
val mapFunc = new CRowFlatMapRunner(
genFunction.name,
genFunction.code,
CRowTypeInfo(schema.physicalTypeInfo))
- inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+ inputDataStream
+ .flatMap(mapFunc)
+ .name(calcOpName(calcProgram, getExpressionString))
+ // keep parallelism to ensure order of accumulate and retract messages
+ .setParallelism(inputParallelism)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 056cda9..18f1fc8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -67,6 +67,8 @@ class DataStreamGroupAggregate(
override def consumesRetractions = true
+ def getGroupings: Array[Int] = groupings
+
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamGroupAggregate(
cluster,
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index f61828b..1be1896 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -59,6 +59,10 @@ class DataStreamGroupWindowAggregate(
override def consumesRetractions = true
+ def getGroupings: Array[Int] = grouping
+
+ def getWindowProperties: Seq[NamedWindowProperty] = namedProperties
+
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamGroupWindowAggregate(
window,
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
index c3b43ba..173b7d3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
@@ -82,19 +82,38 @@ object AccModeTrait {
}
/**
- * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might
- * produce.
- * In [[AccMode.Acc]] the node only emit accumulate messages.
- * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes,
- * retraction messages for delete changes, and accumulate and retraction messages
- * for update changes.
+ * The [[AccMode]] determines how insert, update, and delete changes of tables are encoded
+ * by the messeages that an operator emits.
*/
object AccMode extends Enumeration {
type AccMode = Value
- val Acc = Value // Operator produces only accumulate (insert) messages
- val AccRetract = Value // Operator produces accumulate (insert, update) and
- // retraction (delete, update) messages
+ /**
+ * An operator in [[Acc]] mode emits change messages as
+ * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row).
+ *
+ * An operator in [[Acc]] mode may only produce update and delete messages, if the table has
+ * a unique key and all key attributes are contained in the Row.
+ *
+ * Changes are encoded as follows:
+ * - insert: (true, NewRow)
+ * - update: (true, NewRow) // the Row includes the full unique key to identify the row to update
+ * - delete: (false, OldRow) // the Row includes the full unique key to idenify the row to delete
+ *
+ */
+ val Acc = Value
+
+ /**
+ * * An operator in [[AccRetract]] mode emits change messages as
+ * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row).
+ *
+ * Changes are encoded as follows:
+ * - insert: (true, NewRow)
+ * - update: (false, OldRow), (true, NewRow) // updates are encoded in two messages!
+ * - delete: (false, OldRow)
+ *
+ */
+ val AccRetract = Value
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index 97c0dbb..f0b725d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -102,17 +102,17 @@ object DataStreamRetractionRules {
val rel = call.rel(0).asInstanceOf[DataStreamRel]
val traits = rel.getTraitSet
- val traitsWithUpdateAsRetrac =
+ val traitsWithUpdateAsRetraction =
if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) {
traits.plus(UpdateAsRetractionTrait.DEFAULT)
} else {
traits
}
val traitsWithAccMode =
- if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) {
- traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT)
+ if (null == traitsWithUpdateAsRetraction.getTrait(AccModeTraitDef.INSTANCE)) {
+ traitsWithUpdateAsRetraction.plus(AccModeTrait.DEFAULT)
} else {
- traitsWithUpdateAsRetrac
+ traitsWithUpdateAsRetraction
}
if (traits != traitsWithAccMode) {
@@ -122,8 +122,8 @@ object DataStreamRetractionRules {
}
/**
- * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete
- * changes as retraction messages.
+ * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update changes with
+ * retraction messages.
*/
class SetUpdatesAsRetractionRule extends RelOptRule(
operand(
@@ -131,7 +131,7 @@ object DataStreamRetractionRules {
"SetUpdatesAsRetractionRule") {
/**
- * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction
+ * Checks if a [[RelNode]] requires that update changes are sent with retraction
* messages.
*/
def needsUpdatesAsRetraction(node: RelNode): Boolean = {
@@ -142,7 +142,7 @@ object DataStreamRetractionRules {
}
/**
- * Annotates a [[RelNode]] to send out update and delete changes as retraction messages.
+ * Annotates a [[RelNode]] to send out update changes with retraction messages.
*/
def setUpdatesAsRetraction(relNode: RelNode): RelNode = {
val traitSet = relNode.getTraitSet
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
index 66e51b1..ff3821a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
@@ -37,7 +37,7 @@ class CRowCorrelateFlatMapRunner(
flatMapCode: String,
collectorName: String,
collectorCode: String,
- @transient returnType: TypeInformation[CRow])
+ @transient var returnType: TypeInformation[CRow])
extends RichFlatMapFunction[CRow, CRow]
with ResultTypeQueryable[CRow]
with Compiler[Any] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
index 9a4650b..9701cb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory
class CRowFlatMapRunner(
name: String,
code: String,
- @transient returnType: TypeInformation[CRow])
+ @transient var returnType: TypeInformation[CRow])
extends RichFlatMapFunction[CRow, CRow]
with ResultTypeQueryable[CRow]
with Compiler[FlatMapFunction[Row, Row]] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
index 8e95c93..109c6e1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory
class CRowInputMapRunner[OUT](
name: String,
code: String,
- @transient returnType: TypeInformation[OUT])
+ @transient var returnType: TypeInformation[OUT])
extends RichMapFunction[CRow, OUT]
with ResultTypeQueryable[OUT]
with Compiler[MapFunction[Row, OUT]] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
index 54bbf7e..7c96437 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
@@ -18,6 +18,8 @@
package org.apache.flink.table.runtime
+import java.lang.{Boolean => JBool}
+
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
@@ -28,36 +30,63 @@ import org.apache.flink.types.Row
import org.slf4j.LoggerFactory
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-
/**
- * Convert [[CRow]] to a [[Tuple2]]
+ * Convert [[CRow]] to a [[JTuple2]]
*/
-class CRowInputTupleOutputMapRunner[OUT](
+class CRowInputJavaTupleOutputMapRunner(
name: String,
code: String,
- @transient returnType: TypeInformation[JTuple2[Boolean, OUT]])
- extends RichMapFunction[CRow, JTuple2[Boolean, OUT]]
- with ResultTypeQueryable[JTuple2[Boolean, OUT]]
- with Compiler[MapFunction[Row, OUT]] {
+ @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
+ extends RichMapFunction[CRow, Any]
+ with ResultTypeQueryable[JTuple2[JBool, Any]]
+ with Compiler[MapFunction[Row, Any]] {
val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: MapFunction[Row, OUT] = _
- private var tupleWrapper: JTuple2[Boolean, OUT] = _
+ private var function: MapFunction[Row, Any] = _
+ private var tupleWrapper: JTuple2[JBool, Any] = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
LOG.debug("Instantiating MapFunction.")
function = clazz.newInstance()
- tupleWrapper = new JTuple2[Boolean, OUT]()
+ tupleWrapper = new JTuple2[JBool, Any]()
}
- override def map(in: CRow): JTuple2[Boolean, OUT] = {
+ override def map(in: CRow): JTuple2[JBool, Any] = {
tupleWrapper.f0 = in.change
tupleWrapper.f1 = function.map(in.row)
tupleWrapper
}
- override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType
+ override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType
+}
+
+/**
+ * Convert [[CRow]] to a [[Tuple2]]
+ */
+class CRowInputScalaTupleOutputMapRunner(
+ name: String,
+ code: String,
+ @transient var returnType: TypeInformation[(Boolean, Any)])
+ extends RichMapFunction[CRow, (Boolean, Any)]
+ with ResultTypeQueryable[(Boolean, Any)]
+ with Compiler[MapFunction[Row, Any]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[Row, Any] = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def map(in: CRow): (Boolean, Any) =
+ (in.change, function.map(in.row))
+
+ override def getProducedType: TypeInformation[(Boolean, Any)] = returnType
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
index 966dea9..cb8f695 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory
class CRowOutputMapRunner(
name: String,
code: String,
- @transient returnType: TypeInformation[CRow])
+ @transient var returnType: TypeInformation[CRow])
extends RichMapFunction[Any, CRow]
with ResultTypeQueryable[CRow]
with Compiler[MapFunction[Any, Row]] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
index a0415e1..478b6b6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
@@ -32,7 +32,7 @@ class CorrelateFlatMapRunner[IN, OUT](
flatMapCode: String,
collectorName: String,
collectorCode: String,
- @transient returnType: TypeInformation[OUT])
+ @transient var returnType: TypeInformation[OUT])
extends RichFlatMapFunction[IN, OUT]
with ResultTypeQueryable[OUT]
with Compiler[Any] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
index 715848d..67acc0b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory
class FlatJoinRunner[IN1, IN2, OUT](
name: String,
code: String,
- @transient returnType: TypeInformation[OUT])
+ @transient var returnType: TypeInformation[OUT])
extends RichFlatJoinFunction[IN1, IN2, OUT]
with ResultTypeQueryable[OUT]
with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
index 2e37baf..938da59 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory
class FlatMapRunner(
name: String,
code: String,
- @transient returnType: TypeInformation[Row])
+ @transient var returnType: TypeInformation[Row])
extends RichFlatMapFunction[Row, Row]
with ResultTypeQueryable[Row]
with Compiler[FlatMapFunction[Row, Row]] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
index 644e855..5f3dbb4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
@@ -24,7 +24,7 @@ import org.apache.flink.util.Collector
class MapJoinLeftRunner[IN1, IN2, OUT](
name: String,
code: String,
- @transient returnType: TypeInformation[OUT],
+ returnType: TypeInformation[OUT],
broadcastSetName: String)
extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
index eee38d1..e2d9331 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
@@ -24,7 +24,7 @@ import org.apache.flink.util.Collector
class MapJoinRightRunner[IN1, IN2, OUT](
name: String,
code: String,
- @transient returnType: TypeInformation[OUT],
+ returnType: TypeInformation[OUT],
broadcastSetName: String)
extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
index 32562c7..14eeecf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory
class MapRunner[IN, OUT](
name: String,
code: String,
- @transient returnType: TypeInformation[OUT])
+ @transient var returnType: TypeInformation[OUT])
extends RichMapFunction[IN, OUT]
with ResultTypeQueryable[OUT]
with Compiler[MapFunction[IN, OUT]] {
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
index 090e184..00b7b8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory
abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
name: String,
code: String,
- @transient returnType: TypeInformation[OUT],
+ @transient var returnType: TypeInformation[OUT],
broadcastSetName: String)
extends RichFlatMapFunction[MULTI_IN, OUT]
with ResultTypeQueryable[OUT]
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index 22a2682..9bcac30 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -41,7 +41,7 @@ class DataSetSessionWindowAggregatePreProcessor(
genAggregations: GeneratedAggregationsFunction,
keysAndAggregatesArity: Int,
gap: Long,
- @transient intermediateRowType: TypeInformation[Row])
+ @transient var intermediateRowType: TypeInformation[Row])
extends AbstractRichFunction
with MapPartitionFunction[Row,Row]
with GroupCombineFunction[Row,Row]
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 745f24d..6ee37e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -17,6 +17,8 @@
*/
package org.apache.flink.table.runtime.aggregate
+import java.lang.{Long => JLong}
+
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.types.Row
@@ -24,6 +26,7 @@ import org.apache.flink.util.Collector
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.Types
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.slf4j.LoggerFactory
import org.apache.flink.table.runtime.types.CRow
@@ -47,7 +50,10 @@ class GroupAggProcessFunction(
private var newRow: CRow = _
private var prevRow: CRow = _
private var firstRow: Boolean = _
+ // stores the accumulators
private var state: ValueState[Row] = _
+ // counts the number of added and retracted input records
+ private var cntState: ValueState[JLong] = _
override def open(config: Configuration) {
LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -65,6 +71,9 @@ class GroupAggProcessFunction(
val stateDescriptor: ValueStateDescriptor[Row] =
new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
state = getRuntimeContext.getState(stateDescriptor)
+ val inputCntDescriptor: ValueStateDescriptor[JLong] =
+ new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
+ cntState = getRuntimeContext.getState(inputCntDescriptor)
}
override def processElement(
@@ -74,11 +83,14 @@ class GroupAggProcessFunction(
val input = inputC.row
- // get accumulators
+ // get accumulators and input counter
var accumulators = state.value()
+ var inputCnt = cntState.value()
+
if (null == accumulators) {
firstRow = true
accumulators = function.createAccumulators()
+ inputCnt = 0L
} else {
firstRow = false
}
@@ -92,29 +104,44 @@ class GroupAggProcessFunction(
// update aggregate result and set to the newRow
if (inputC.change) {
+ inputCnt += 1
// accumulate input
function.accumulate(accumulators, input)
function.setAggregationResults(accumulators, newRow.row)
} else {
+ inputCnt -= 1
// retract input
function.retract(accumulators, input)
function.setAggregationResults(accumulators, newRow.row)
}
- // update accumulators
- state.update(accumulators)
-
- // if previousRow is not null, do retraction process
- if (generateRetraction && !firstRow) {
- if (prevRow.row.equals(newRow.row)) {
- // ignore same newRow
- return
- } else {
- // retract previous row
- out.collect(prevRow)
+ if (inputCnt != 0) {
+ // we aggregated at least one record for this key
+
+ // update the state
+ state.update(accumulators)
+ cntState.update(inputCnt)
+
+ // if this was not the first row and we have to emit retractions
+ if (generateRetraction && !firstRow) {
+ if (prevRow.row.equals(newRow.row)) {
+ // newRow is the same as before. Do not emit retraction and acc messages
+ return
+ } else {
+ // retract previous result
+ out.collect(prevRow)
+ }
}
- }
+ // emit the new result
+ out.collect(newRow)
- out.collect(newRow)
+ } else {
+ // we retracted the last record for this key
+ // sent out a delete message
+ out.collect(prevRow)
+ // and clear all state
+ state.clear()
+ cntState.clear()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
index ec73fa6..1cb3a6e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory
class CRowValuesInputFormat(
name: String,
code: String,
- @transient returnType: TypeInformation[CRow])
+ @transient var returnType: TypeInformation[CRow])
extends GenericInputFormat[CRow]
with NonParallelInput
with ResultTypeQueryable[CRow]
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
index d536b39..43ce605 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory
class ValuesInputFormat(
name: String,
code: String,
- @transient returnType: TypeInformation[Row])
+ @transient var returnType: TypeInformation[Row])
extends GenericInputFormat[Row]
with NonParallelInput
with ResultTypeQueryable[Row]
http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
new file mode 100644
index 0000000..abdca17
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/**
+ * Defines an external [[TableSink]] to emit streaming [[Table]] with only insert changes.
+ *
+ * If the [[Table]] is also modified by update or delete changes, a
+ * [[org.apache.flink.table.api.TableException]] will be thrown.
+ *
+ * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
+ */
+trait AppendStreamTableSink[T] extends TableSink[T] {
+
+ /** Emits the DataStream. */
+ def emitDataStream(dataStream: DataStream[T]): Unit
+}