You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:47 UTC
[13/15] flink git commit: [FLINK-6093] [table] Implement and turn on
retraction for table sinks.
[FLINK-6093] [table] Implement and turn on retraction for table sinks.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfed279f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfed279f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfed279f
Branch: refs/heads/master
Commit: bfed279f05a0e131c11f963e2380cd4c582e6bc3
Parents: 27bf4ca
Author: Hequn Cheng <ch...@gmail.com>
Authored: Thu Apr 27 23:03:44 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:55 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 16 +--
.../table/api/StreamTableEnvironment.scala | 122 +++++++++++++++++--
.../apache/flink/table/api/TableConfig.scala | 17 +++
.../flink/table/api/TableEnvironment.scala | 12 --
.../datastream/DataStreamRetractionRules.scala | 5 +-
.../runtime/CRowInputTupleOutputMapRunner.scala | 63 ++++++++++
.../apache/flink/table/sinks/CsvTableSink.scala | 98 ++++++++++++++-
.../flink/table/sinks/StreamRetractSink.scala | 35 ++++++
.../flink/table/TableEnvironmentTest.scala | 62 +---------
.../scala/batch/TableEnvironmentITCase.scala | 19 ---
.../api/scala/stream/RetractionITCase.scala | 9 +-
.../api/scala/stream/TableSinkITCase.scala | 33 ++++-
.../api/scala/stream/utils/StreamITCase.scala | 8 --
13 files changed, 363 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index f7955f0..c7bacfe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,12 +26,11 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.RuleSet
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.configuration.Configuration
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -39,7 +38,6 @@ import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
import org.apache.flink.types.Row
@@ -150,18 +148,6 @@ abstract class BatchTableEnvironment(
if (requestedTypeInfo.getTypeClass == classOf[Row]) {
// Row to Row, no conversion needed
None
- } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
- // Row to CRow, only needs to be wrapped
- Some(
- new RichMapFunction[Row, CRow] {
- private var outCRow: CRow = _
- override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true)
- override def map(value: Row): CRow = {
- outCRow.row = value
- outCRow
- }
- }.asInstanceOf[MapFunction[IN, OUT]]
- )
} else {
// some type that is neither Row or CRow
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 0632a47..bd06305 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,22 +26,25 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.AtomicType
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable}
-import org.apache.flink.table.runtime.CRowInputMapRunner
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.types.Row
@@ -130,6 +133,14 @@ abstract class StreamTableEnvironment(
val result: DataStream[T] = translate(table)(outputType)
// Give the DataSet to the TableSink to emit it.
streamSink.emitDataStream(result)
+
+ case streamRetractSink: StreamRetractSink[T] =>
+ val outputType = sink.getOutputType
+ this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction)
+ // translate the Table into a DataStream and provide the type that the TableSink expects.
+ val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType)
+ // Give the DataSet to the TableSink to emit it.
+ streamRetractSink.emitDataStreamWithChange(result)
case _ =>
throw new TableException("StreamTableSink required to emit streaming Table")
}
@@ -153,7 +164,7 @@ abstract class StreamTableEnvironment(
Option[MapFunction[IN, OUT]] = {
if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
- // CRow to CRow, no conversion needed
+ // only used to explain table
None
} else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
// CRow to Row, only needs to be unwrapped
@@ -164,7 +175,6 @@ abstract class StreamTableEnvironment(
)
} else {
// Some type that is neither CRow nor Row
-
val converterFunction = generateRowConverterFunction[OUT](
physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
logicalRowType,
@@ -177,11 +187,40 @@ abstract class StreamTableEnvironment(
converterFunction.code,
converterFunction.returnType)
.asInstanceOf[MapFunction[IN, OUT]])
-
}
}
/**
+ * Creates a final converter that maps the internal CRow type to external Tuple2 type.
+ *
+ * @param physicalTypeInfo the input of the sink
+ * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param requestedTypeInfo the output type of the sink
+ * @param functionName name of the map function. Must not be unique but has to be a
+ * valid Java class identifier.
+ */
+ protected def getTupleConversionMapper[IN, OUT](
+ physicalTypeInfo: TypeInformation[IN],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String):
+ Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = {
+
+ val converterFunction = generateRowConverterFunction(
+ physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+ logicalRowType,
+ requestedTypeInfo,
+ functionName
+ )
+
+ Some(new CRowInputTupleOutputMapRunner[OUT](
+ converterFunction.name,
+ converterFunction.code,
+ new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo))
+ .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]])
+ }
+
+ /**
* Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
* catalog.
*
@@ -371,7 +410,7 @@ abstract class StreamTableEnvironment(
// 5. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
- val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
+ var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
} else {
logicalPlan
@@ -380,6 +419,12 @@ abstract class StreamTableEnvironment(
// 6. decorate the optimized plan
val decoRuleSet = getDecoRuleSet
val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+
+ if (this.config.getNeedsUpdatesAsRetractionForSink) {
+ physicalPlan = physicalPlan.copy(
+ physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
+ physicalPlan.getInputs)
+ }
runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
} else {
physicalPlan
@@ -388,7 +433,6 @@ abstract class StreamTableEnvironment(
decoratedPlan
}
-
/**
* Translates a [[Table]] into a [[DataStream]].
*
@@ -444,6 +488,62 @@ abstract class StreamTableEnvironment(
}
/**
+ * Translates a [[Table]] into a [[DataStream]] with change information.
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+ *
+ * @param table The root node of the relational expression tree.
+ * @param wrapToTuple True, if want to output chang information
+ * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+ * @tparam A The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A])
+ : DataStream[JTuple2[Boolean, A]] = {
+ val relNode = table.getRelNode
+ val dataStreamPlan = optimize(relNode)
+ translate(dataStreamPlan, relNode.getRowType, wrapToTuple)
+ }
+
+ /**
+ * Translates a logical [[RelNode]] into a [[DataStream]] with change information.
+ *
+ * @param logicalPlan The root node of the relational expression tree.
+ * @param logicalType The row type of the result. Since the logicalPlan can lose the
+ * @param wrapToTuple True, if want to output chang information
+ * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+ * @tparam A The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](
+ logicalPlan: RelNode,
+ logicalType: RelDataType,
+ wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = {
+
+ TableEnvironment.validateType(tpe)
+
+ logicalPlan match {
+ case node: DataStreamRel =>
+ val plan = node.translateToPlan(this)
+ val conversion =
+ getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+ conversion match {
+ case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary
+ case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) =>
+ plan.map(mapFunction)
+ .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe))
+ .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+ .asInstanceOf[DataStream[JTuple2[Boolean, A]]]
+ }
+
+ case _ =>
+ throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
+ }
+ }
+
+ /**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
*
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index 6448657..d296978 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,6 +37,11 @@ class TableConfig {
private var nullCheck: Boolean = true
/**
+ * Defines whether sink table requires that update and delete changes are sent with retraction
+ */
+ private var needsUpdatesAsRetractionForSink: Boolean = false
+
+ /**
* Defines the configuration of Calcite for Table API and SQL queries.
*/
private var calciteConfig = CalciteConfig.DEFAULT
@@ -67,6 +72,18 @@ class TableConfig {
}
/**
+ * Returns the need retraction property for table sink.
+ */
+ def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink
+
+ /**
+ * Set the need retraction property for table sink.
+ */
+ def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = {
+ this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction
+ }
+
+ /**
* Returns the current configuration of Calcite for Table API and SQL queries.
*/
def getCalciteConfig: CalciteConfig = calciteConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d27db1e..5b752ab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -644,18 +644,6 @@ abstract class TableEnvironment(val config: TableConfig) {
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
- case cr: CRowTypeInfo =>
- exprs.zipWithIndex.map {
- case (UnresolvedFieldReference(name), idx) => (idx, name)
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
- val idx = cr.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $cr")
- }
- (idx, name)
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
case c: CaseClassTypeInfo[A] =>
exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index bd9a7ee..97c0dbb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -102,13 +102,14 @@ object DataStreamRetractionRules {
val rel = call.rel(0).asInstanceOf[DataStreamRel]
val traits = rel.getTraitSet
- val traitsWithUpdateAsRetrac = if (!traits.contains(UpdateAsRetractionTraitDef.INSTANCE)) {
+ val traitsWithUpdateAsRetrac =
+ if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) {
traits.plus(UpdateAsRetractionTrait.DEFAULT)
} else {
traits
}
val traitsWithAccMode =
- if (!traitsWithUpdateAsRetrac.contains(AccModeTraitDef.INSTANCE)) {
+ if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) {
traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT)
} else {
traitsWithUpdateAsRetrac
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
new file mode 100644
index 0000000..54bbf7e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+
+
+/**
+ * Convert [[CRow]] to a [[Tuple2]]
+ */
+class CRowInputTupleOutputMapRunner[OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[JTuple2[Boolean, OUT]])
+ extends RichMapFunction[CRow, JTuple2[Boolean, OUT]]
+ with ResultTypeQueryable[JTuple2[Boolean, OUT]]
+ with Compiler[MapFunction[Row, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[Row, OUT] = _
+ private var tupleWrapper: JTuple2[Boolean, OUT] = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ tupleWrapper = new JTuple2[Boolean, OUT]()
+ }
+
+ override def map(in: CRow): JTuple2[Boolean, OUT] = {
+ tupleWrapper.f0 = in.change
+ tupleWrapper.f1 = function.map(in.row)
+ tupleWrapper
+ }
+
+ override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index 4a2fcdf..809afd2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,7 +25,7 @@ import org.apache.flink.types.Row
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
/**
* A simple [[TableSink]] to emit data as CSV files.
@@ -135,3 +135,99 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
}
}
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter
+ * @param numFiles The number of files to write to
+ * @param writeMode The write mode to specify whether existing files are overwritten or not.
+ */
+class CsvRetractTableSink(
+ path: String,
+ fieldDelim: Option[String],
+ numFiles: Option[Int],
+ writeMode: Option[WriteMode])
+ extends TableSinkBase[Row] with StreamRetractSink[Row] {
+
+ override def needsUpdatesAsRetraction: Boolean = true
+
+ /**
+ * A simple [[TableSink]] to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter, ',' by default.
+ */
+ def this(path: String, fieldDelim: String = ",") {
+ this(path, Some(fieldDelim), None, None)
+ }
+
+ /**
+ * A simple [[TableSink]] to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter.
+ * @param numFiles The number of files to write to.
+ * @param writeMode The write mode to specify whether existing files are overwritten or not.
+ */
+ def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
+ this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
+ }
+
+
+ override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = {
+ val csvRows = dataStream
+ .map(new CsvRetractFormatter(fieldDelim.getOrElse(",")))
+ .returns(TypeInformation.of(classOf[String]))
+
+
+ if (numFiles.isDefined) {
+ csvRows.setParallelism(numFiles.get)
+ }
+
+ val sink = writeMode match {
+ case None => csvRows.writeAsText(path)
+ case Some(wm) => csvRows.writeAsText(path, wm)
+ }
+
+ if (numFiles.isDefined) {
+ sink.setParallelism(numFiles.get)
+ }
+ }
+
+ override protected def copy: TableSinkBase[Row] = {
+ new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode)
+ }
+
+ override def getOutputType: TypeInformation[Row] = {
+ new RowTypeInfo(getFieldTypes: _*)
+ }
+}
+
+/**
+ * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the
+ * field delimiter.
+ *
+ * @param fieldDelim The field delimiter.
+ */
+class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] {
+ override def map(rowT: JTuple2[Boolean,Row]): String = {
+
+ val row: Row = rowT.f1
+
+ val builder = new StringBuilder
+
+ builder.append(rowT.f0.toString)
+
+ // write following values
+ for (i <- 0 until row.getArity) {
+ builder.append(fieldDelim)
+ val v = row.getField(i)
+ if (v != null) {
+ builder.append(v.toString)
+ }
+ }
+ builder.mkString
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
new file mode 100644
index 0000000..7f7c944
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+trait StreamRetractSink[T] extends TableSink[T]{
+
+ /**
+ * Whether the [[StreamTableSink]] requires that update and delete changes are sent with
+ * retraction messages.
+ */
+ def needsUpdatesAsRetraction: Boolean = false
+
+ /** Emits the DataStream with change infomation. */
+ def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,T]]): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 60de1f1..675e5d9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.scala._
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.table.api.TableException
@@ -62,14 +63,6 @@ class TableEnvironmentTest extends TableTestBase {
}
@Test
- def testGetFieldInfoCRow(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(cRowType)
-
- fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
- }
-
- @Test
def testGetFieldInfoCClass(): Unit = {
val fieldInfo = tEnv.getFieldInfo(caseClassType)
@@ -113,20 +106,6 @@ class TableEnvironmentTest extends TableTestBase {
}
@Test
- def testGetFieldInfoCRowNames(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- cRowType,
- Array(
- UnresolvedFieldReference("name1"),
- UnresolvedFieldReference("name2"),
- UnresolvedFieldReference("name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
- }
-
- @Test
def testGetFieldInfoCClassNames(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
caseClassType,
@@ -225,45 +204,6 @@ class TableEnvironmentTest extends TableTestBase {
}
@Test
- def testGetFieldInfoCRowAlias1(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- cRowType,
- Array(
- Alias(UnresolvedFieldReference("f0"), "name1"),
- Alias(UnresolvedFieldReference("f1"), "name2"),
- Alias(UnresolvedFieldReference("f2"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
- }
-
- @Test
- def testGetFieldInfoCRowAlias2(): Unit = {
- val fieldInfo = tEnv.getFieldInfo(
- cRowType,
- Array(
- Alias(UnresolvedFieldReference("f2"), "name1"),
- Alias(UnresolvedFieldReference("f0"), "name2"),
- Alias(UnresolvedFieldReference("f1"), "name3")
- ))
-
- fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
- fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
- }
-
- @Test(expected = classOf[TableException])
- def testGetFieldInfoCRowAlias3(): Unit = {
- tEnv.getFieldInfo(
- cRowType,
- Array(
- Alias(UnresolvedFieldReference("xxx"), "name1"),
- Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
- }
-
- @Test
def testGetFieldInfoCClassAlias1(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
caseClassType,
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index ebfac0a..0c2505a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -165,25 +165,6 @@ class TableEnvironmentITCase(
}
@Test
- def testToDataSetWithTypeOfCRow(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- val t = CollectionDataSets.get3TupleDataSet(env)
- .toTable(tEnv, 'a, 'b, 'c)
- .select('a, 'b, 'c)
-
- val expected = "+1,1,Hi\n" + "+2,2,Hello\n" + "+3,2,Hello world\n" +
- "+4,3,Hello world, how are you?\n" + "+5,3,I am fine.\n" + "+6,3,Luke Skywalker\n" +
- "+7,4,Comment#1\n" + "+8,4,Comment#2\n" + "+9,4,Comment#3\n" + "+10,4,Comment#4\n" +
- "+11,5,Comment#5\n" + "+12,5,Comment#6\n" + "+13,5,Comment#7\n" + "+14,5,Comment#8\n" +
- "+15,5,Comment#9\n" + "+16,6,Comment#10\n" + "+17,6,Comment#11\n" + "+18,6,Comment#12\n" +
- "+19,6,Comment#13\n" + "+20,6,Comment#14\n" + "+21,6,Comment#15\n"
- val results = t.toDataSet[CRow].collect()
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
def testToTableFromCaseClass(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
index dde7f89..d490763 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
@@ -67,13 +67,12 @@ class RetractionITCase extends StreamingWithStateTestBase {
.groupBy('count)
.select('count, 'word.count as 'frequency)
- // to DataStream with CRow
- val results = resultTable.toDataStream[CRow]
- results.addSink(new StreamITCase.StringSinkWithCRow)
+ val results = resultTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
env.execute()
- val expected = Seq("+1,1", "+1,2", "+1,1", "+2,1", "+1,2", "+1,1", "+2,2", "+2,1", "+3,1",
- "+3,0", "+4,1", "+4,0", "+5,1", "+5,0", "+6,1", "+1,2")
+ val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0",
+ "4,1", "4,0", "5,1", "5,0", "6,1", "1,2")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
index c446d64..ceae6c6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -23,7 +23,7 @@ import java.io.File
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.stream.utils.StreamTestData
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.table.sinks.{CsvTableSink, CsvRetractTableSink}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
@@ -59,5 +59,34 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
TestBaseUtils.compareResultsByLinesInMemory(expected, path)
}
-
+
+ @Test
+ def testStreamTableSinkNeedRetraction(): Unit = {
+
+ val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+ tmpFile.deleteOnExit()
+ val path = tmpFile.toURI.toString
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(4)
+
+ val input = StreamTestData.get3TupleDataStream(env)
+ .map(x => x).setParallelism(1) // increase DOP to 4
+
+ val results = input.toTable(tEnv, 'a, 'b, 'c)
+ .where('a < 5 || 'a > 17)
+ .select('c, 'b)
+ .groupBy('b)
+ .select('b, 'c.count)
+ .writeToSink(new CsvRetractTableSink(path))
+
+ env.execute()
+
+ val expected = Seq(
+ "true,1,1", "true,2,1", "false,2,1", "true,2,2", "true,3,1", "true,6,1", "false,6,1",
+ "true,6,2", "false,6,2", "true,6,3", "false,6,3", "true,6,4").mkString("\n")
+
+ TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index 6c75d53..497869d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -49,12 +49,4 @@ object StreamITCase {
}
}
}
-
- final class StringSinkWithCRow extends RichSinkFunction[CRow]() {
- def invoke(value: CRow) {
- testResults.synchronized {
- testResults += value.toString
- }
- }
- }
}