You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:47 UTC

[13/15] flink git commit: [FLINK-6093] [table] Implement and turn on retraction for table sinks.

[FLINK-6093] [table] Implement and turn on retraction for table sinks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfed279f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfed279f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfed279f

Branch: refs/heads/master
Commit: bfed279f05a0e131c11f963e2380cd4c582e6bc3
Parents: 27bf4ca
Author: Hequn Cheng <ch...@gmail.com>
Authored: Thu Apr 27 23:03:44 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:55 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  16 +--
 .../table/api/StreamTableEnvironment.scala      | 122 +++++++++++++++++--
 .../apache/flink/table/api/TableConfig.scala    |  17 +++
 .../flink/table/api/TableEnvironment.scala      |  12 --
 .../datastream/DataStreamRetractionRules.scala  |   5 +-
 .../runtime/CRowInputTupleOutputMapRunner.scala |  63 ++++++++++
 .../apache/flink/table/sinks/CsvTableSink.scala |  98 ++++++++++++++-
 .../flink/table/sinks/StreamRetractSink.scala   |  35 ++++++
 .../flink/table/TableEnvironmentTest.scala      |  62 +---------
 .../scala/batch/TableEnvironmentITCase.scala    |  19 ---
 .../api/scala/stream/RetractionITCase.scala     |   9 +-
 .../api/scala/stream/TableSinkITCase.scala      |  33 ++++-
 .../api/scala/stream/utils/StreamITCase.scala   |   8 --
 13 files changed, 363 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index f7955f0..c7bacfe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,12 +26,11 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.RuleSet
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -39,7 +38,6 @@ import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
 import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
@@ -150,18 +148,6 @@ abstract class BatchTableEnvironment(
     if (requestedTypeInfo.getTypeClass == classOf[Row]) {
       // Row to Row, no conversion needed
       None
-    } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
-      // Row to CRow, only needs to be wrapped
-      Some(
-        new RichMapFunction[Row, CRow] {
-          private var outCRow: CRow = _
-          override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true)
-          override def map(value: Row): CRow = {
-            outCRow.row = value
-            outCRow
-          }
-        }.asInstanceOf[MapFunction[IN, OUT]]
-      )
     } else {
       // some type that is neither Row or CRow
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 0632a47..bd06305 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,22 +26,25 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.AtomicType
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.calcite.RelTimeIndicatorConverter
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
 import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable}
-import org.apache.flink.table.runtime.CRowInputMapRunner
+import org.apache.flink.table.plan.schema.StreamTableSourceTable
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
 import org.apache.flink.types.Row
@@ -130,6 +133,14 @@ abstract class StreamTableEnvironment(
         val result: DataStream[T] = translate(table)(outputType)
         // Give the DataSet to the TableSink to emit it.
         streamSink.emitDataStream(result)
+
+      case streamRetractSink: StreamRetractSink[T] =>
+        val outputType = sink.getOutputType
+        this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction)
+        // translate the Table into a DataStream and provide the type that the TableSink expects.
+        val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType)
+        // Give the DataSet to the TableSink to emit it.
+        streamRetractSink.emitDataStreamWithChange(result)
       case _ =>
         throw new TableException("StreamTableSink required to emit streaming Table")
     }
@@ -153,7 +164,7 @@ abstract class StreamTableEnvironment(
   Option[MapFunction[IN, OUT]] = {
 
     if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
-      // CRow to CRow, no conversion needed
+      // only used to explain table
       None
     } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
       // CRow to Row, only needs to be unwrapped
@@ -164,7 +175,6 @@ abstract class StreamTableEnvironment(
       )
     } else {
       // Some type that is neither CRow nor Row
-
       val converterFunction = generateRowConverterFunction[OUT](
         physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
         logicalRowType,
@@ -177,11 +187,40 @@ abstract class StreamTableEnvironment(
         converterFunction.code,
         converterFunction.returnType)
         .asInstanceOf[MapFunction[IN, OUT]])
-
     }
   }
 
   /**
+    * Creates a final converter that maps the internal CRow type to external Tuple2 type.
+    *
+    * @param physicalTypeInfo the input of the sink
+    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink
+    * @param functionName name of the map function. Must not be unique but has to be a
+    *                     valid Java class identifier.
+    */
+  protected def getTupleConversionMapper[IN, OUT](
+      physicalTypeInfo: TypeInformation[IN],
+      logicalRowType: RelDataType,
+      requestedTypeInfo: TypeInformation[OUT],
+      functionName: String):
+  Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = {
+
+    val converterFunction = generateRowConverterFunction(
+      physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+      logicalRowType,
+      requestedTypeInfo,
+      functionName
+    )
+
+    Some(new CRowInputTupleOutputMapRunner[OUT](
+      converterFunction.name,
+      converterFunction.code,
+      new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo))
+           .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]])
+  }
+
+  /**
     * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
     * catalog.
     *
@@ -371,7 +410,7 @@ abstract class StreamTableEnvironment(
     // 5. optimize the physical Flink plan
     val physicalOptRuleSet = getPhysicalOptRuleSet
     val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
-    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
+    var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
       runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
     } else {
       logicalPlan
@@ -380,6 +419,12 @@ abstract class StreamTableEnvironment(
     // 6. decorate the optimized plan
     val decoRuleSet = getDecoRuleSet
     val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+
+      if (this.config.getNeedsUpdatesAsRetractionForSink) {
+        physicalPlan = physicalPlan.copy(
+          physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
+          physicalPlan.getInputs)
+      }
       runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
     } else {
       physicalPlan
@@ -388,7 +433,6 @@ abstract class StreamTableEnvironment(
     decoratedPlan
   }
 
-
   /**
     * Translates a [[Table]] into a [[DataStream]].
     *
@@ -444,6 +488,62 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Translates a [[Table]] into a [[DataStream]] with change information.
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    *
+    * @param table       The root node of the relational expression tree.
+    * @param wrapToTuple True, if want to output chang information
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A])
+  : DataStream[JTuple2[Boolean, A]] = {
+    val relNode = table.getRelNode
+    val dataStreamPlan = optimize(relNode)
+    translate(dataStreamPlan, relNode.getRowType, wrapToTuple)
+  }
+
+  /**
+    * Translates a logical [[RelNode]] into a [[DataStream]] with change information.
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param logicalType The row type of the result. Since the logicalPlan can lose the
+    * @param wrapToTuple True, if want to output chang information
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](
+      logicalPlan: RelNode,
+      logicalType: RelDataType,
+      wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = {
+
+    TableEnvironment.validateType(tpe)
+
+    logicalPlan match {
+      case node: DataStreamRel =>
+        val plan = node.translateToPlan(this)
+        val conversion =
+          getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+        conversion match {
+          case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary
+          case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) =>
+            plan.map(mapFunction)
+              .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe))
+              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+              .asInstanceOf[DataStream[JTuple2[Boolean, A]]]
+        }
+
+      case _ =>
+        throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
+          "This is a bug and should not happen. Please file an issue.")
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
     * the result of the given [[Table]].
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index 6448657..d296978 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,6 +37,11 @@ class TableConfig {
   private var nullCheck: Boolean = true
 
   /**
+    * Defines whether sink table requires that update and delete changes are sent with retraction
+    */
+  private var needsUpdatesAsRetractionForSink: Boolean = false
+
+  /**
     * Defines the configuration of Calcite for Table API and SQL queries.
     */
   private var calciteConfig = CalciteConfig.DEFAULT
@@ -67,6 +72,18 @@ class TableConfig {
   }
 
   /**
+    * Returns the need retraction property for table sink.
+    */
+  def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink
+
+  /**
+    * Set the need retraction property for table sink.
+    */
+  def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = {
+    this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction
+  }
+
+  /**
     * Returns the current configuration of Calcite for Table API and SQL queries.
     */
   def getCalciteConfig: CalciteConfig = calciteConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d27db1e..5b752ab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -644,18 +644,6 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
-      case cr: CRowTypeInfo =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
-            val idx = cr.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $cr")
-            }
-            (idx, name)
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
       case c: CaseClassTypeInfo[A] =>
         exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index bd9a7ee..97c0dbb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -102,13 +102,14 @@ object DataStreamRetractionRules {
       val rel = call.rel(0).asInstanceOf[DataStreamRel]
       val traits = rel.getTraitSet
 
-      val traitsWithUpdateAsRetrac = if (!traits.contains(UpdateAsRetractionTraitDef.INSTANCE)) {
+      val traitsWithUpdateAsRetrac =
+        if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) {
         traits.plus(UpdateAsRetractionTrait.DEFAULT)
       } else {
         traits
       }
       val traitsWithAccMode =
-        if (!traitsWithUpdateAsRetrac.contains(AccModeTraitDef.INSTANCE)) {
+        if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) {
           traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT)
       } else {
         traitsWithUpdateAsRetrac

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
new file mode 100644
index 0000000..54bbf7e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+
+
+/**
+  * Convert [[CRow]] to a [[Tuple2]]
+  */
+class CRowInputTupleOutputMapRunner[OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[JTuple2[Boolean, OUT]])
+  extends RichMapFunction[CRow, JTuple2[Boolean, OUT]]
+          with ResultTypeQueryable[JTuple2[Boolean, OUT]]
+          with Compiler[MapFunction[Row, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, OUT] = _
+  private var tupleWrapper: JTuple2[Boolean, OUT] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+    tupleWrapper = new JTuple2[Boolean, OUT]()
+  }
+
+  override def map(in: CRow): JTuple2[Boolean, OUT] = {
+    tupleWrapper.f0 = in.change
+    tupleWrapper.f1 = function.map(in.row)
+    tupleWrapper
+  }
+
+  override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index 4a2fcdf..809afd2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,7 +25,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 
 /**
   * A simple [[TableSink]] to emit data as CSV files.
@@ -135,3 +135,99 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
   }
 }
 
+/**
+  * A simple [[TableSink]] to emit data as CSV files.
+  *
+  * @param path The output path to write the Table to.
+  * @param fieldDelim The field delimiter
+  * @param numFiles The number of files to write to
+  * @param writeMode The write mode to specify whether existing files are overwritten or not.
+  */
+class CsvRetractTableSink(
+    path: String,
+    fieldDelim: Option[String],
+    numFiles: Option[Int],
+    writeMode: Option[WriteMode])
+  extends TableSinkBase[Row] with StreamRetractSink[Row] {
+
+  override def needsUpdatesAsRetraction: Boolean = true
+
+  /**
+    * A simple [[TableSink]] to emit data as CSV files.
+    *
+    * @param path The output path to write the Table to.
+    * @param fieldDelim The field delimiter, ',' by default.
+    */
+  def this(path: String, fieldDelim: String = ",") {
+    this(path, Some(fieldDelim), None, None)
+  }
+
+  /**
+    * A simple [[TableSink]] to emit data as CSV files.
+    *
+    * @param path The output path to write the Table to.
+    * @param fieldDelim The field delimiter.
+    * @param numFiles The number of files to write to.
+    * @param writeMode The write mode to specify whether existing files are overwritten or not.
+    */
+  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
+    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
+  }
+
+
+  override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = {
+    val csvRows = dataStream
+      .map(new CsvRetractFormatter(fieldDelim.getOrElse(",")))
+      .returns(TypeInformation.of(classOf[String]))
+
+
+    if (numFiles.isDefined) {
+      csvRows.setParallelism(numFiles.get)
+    }
+
+    val sink = writeMode match {
+      case None => csvRows.writeAsText(path)
+      case Some(wm) => csvRows.writeAsText(path, wm)
+    }
+
+    if (numFiles.isDefined) {
+      sink.setParallelism(numFiles.get)
+    }
+  }
+
+  override protected def copy: TableSinkBase[Row] = {
+    new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode)
+  }
+
+  override def getOutputType: TypeInformation[Row] = {
+    new RowTypeInfo(getFieldTypes: _*)
+  }
+}
+
+/**
+  * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the
+  * field delimiter.
+  *
+  * @param fieldDelim The field delimiter.
+  */
+class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] {
+  override def map(rowT: JTuple2[Boolean,Row]): String = {
+
+    val row: Row = rowT.f1
+
+    val builder = new StringBuilder
+
+    builder.append(rowT.f0.toString)
+
+    // write following values
+    for (i <- 0 until row.getArity) {
+      builder.append(fieldDelim)
+      val v = row.getField(i)
+      if (v != null) {
+        builder.append(v.toString)
+      }
+    }
+    builder.mkString
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
new file mode 100644
index 0000000..7f7c944
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.DataStream
+
+trait StreamRetractSink[T] extends TableSink[T]{
+
+  /**
+    * Whether the [[StreamTableSink]] requires that update and delete changes are sent with
+    * retraction messages.
+    */
+  def needsUpdatesAsRetraction: Boolean = false
+
+  /** Emits the DataStream with change infomation. */
+  def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,T]]): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 60de1f1..675e5d9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.TableException
@@ -62,14 +63,6 @@ class TableEnvironmentTest extends TableTestBase {
   }
 
   @Test
-  def testGetFieldInfoCRow(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(cRowType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
   def testGetFieldInfoCClass(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(caseClassType)
 
@@ -113,20 +106,6 @@ class TableEnvironmentTest extends TableTestBase {
   }
 
   @Test
-  def testGetFieldInfoCRowNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      cRowType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
   def testGetFieldInfoCClassNames(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
       caseClassType,
@@ -225,45 +204,6 @@ class TableEnvironmentTest extends TableTestBase {
   }
 
   @Test
-  def testGetFieldInfoCRowAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      cRowType,
-      Array(
-        Alias(UnresolvedFieldReference("f0"), "name1"),
-        Alias(UnresolvedFieldReference("f1"), "name2"),
-        Alias(UnresolvedFieldReference("f2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test
-  def testGetFieldInfoCRowAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      cRowType,
-      Array(
-        Alias(UnresolvedFieldReference("f2"), "name1"),
-        Alias(UnresolvedFieldReference("f0"), "name2"),
-        Alias(UnresolvedFieldReference("f1"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoCRowAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      cRowType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test
   def testGetFieldInfoCClassAlias1(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
       caseClassType,

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index ebfac0a..0c2505a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -165,25 +165,6 @@ class TableEnvironmentITCase(
   }
 
   @Test
-  def testToDataSetWithTypeOfCRow(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "+1,1,Hi\n" + "+2,2,Hello\n" + "+3,2,Hello world\n" +
-      "+4,3,Hello world, how are you?\n" + "+5,3,I am fine.\n" + "+6,3,Luke Skywalker\n" +
-      "+7,4,Comment#1\n" + "+8,4,Comment#2\n" + "+9,4,Comment#3\n" + "+10,4,Comment#4\n" +
-      "+11,5,Comment#5\n" + "+12,5,Comment#6\n" + "+13,5,Comment#7\n" + "+14,5,Comment#8\n" +
-      "+15,5,Comment#9\n" + "+16,6,Comment#10\n" + "+17,6,Comment#11\n" + "+18,6,Comment#12\n" +
-      "+19,6,Comment#13\n" + "+20,6,Comment#14\n" + "+21,6,Comment#15\n"
-    val results = t.toDataSet[CRow].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
   def testToTableFromCaseClass(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
index dde7f89..d490763 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
@@ -67,13 +67,12 @@ class RetractionITCase extends StreamingWithStateTestBase {
       .groupBy('count)
       .select('count, 'word.count as 'frequency)
 
-    // to DataStream with CRow
-    val results = resultTable.toDataStream[CRow]
-    results.addSink(new StreamITCase.StringSinkWithCRow)
+    val results = resultTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = Seq("+1,1", "+1,2", "+1,1", "+2,1", "+1,2", "+1,1", "+2,2", "+2,1", "+3,1",
-      "+3,0", "+4,1", "+4,0", "+5,1", "+5,0", "+6,1", "+1,2")
+    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0",
+      "4,1", "4,0", "5,1", "5,0", "6,1", "1,2")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
index c446d64..ceae6c6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -23,7 +23,7 @@ import java.io.File
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamTestData
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.table.sinks.{CsvTableSink, CsvRetractTableSink}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
@@ -59,5 +59,34 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }
-  
+
+  @Test
+  def testStreamTableSinkNeedRetraction(): Unit = {
+
+    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+    tmpFile.deleteOnExit()
+    val path = tmpFile.toURI.toString
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(4)
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .map(x => x).setParallelism(1) // increase DOP to 4
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .groupBy('b)
+      .select('b, 'c.count)
+      .writeToSink(new CsvRetractTableSink(path))
+
+    env.execute()
+
+    val expected = Seq(
+      "true,1,1", "true,2,1", "false,2,1", "true,2,2", "true,3,1", "true,6,1", "false,6,1",
+      "true,6,2", "false,6,2", "true,6,3", "false,6,3", "true,6,4").mkString("\n")
+
+    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfed279f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index 6c75d53..497869d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -49,12 +49,4 @@ object StreamITCase {
       }
     }
   }
-
-  final class StringSinkWithCRow extends RichSinkFunction[CRow]() {
-    def invoke(value: CRow) {
-      testResults.synchronized {
-        testResults += value.toString
-      }
-    }
-  }
 }