You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:49 UTC

[15/15] flink git commit: [FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction.

[FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f37988c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f37988c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f37988c1

Branch: refs/heads/master
Commit: f37988c19adc30d324cde83c54f2fa5d36efb9e7
Parents: bfed279
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Apr 28 01:59:57 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:55 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTableSink.java        |   6 +-
 .../flink/table/api/BatchTableEnvironment.scala |  16 +-
 .../table/api/StreamTableEnvironment.scala      | 428 +++++++++++-----
 .../apache/flink/table/api/TableConfig.scala    |  17 -
 .../flink/table/api/TableEnvironment.scala      |  48 +-
 .../table/api/java/StreamTableEnvironment.scala |  81 ++-
 .../api/scala/StreamTableEnvironment.scala      |  33 +-
 .../table/api/scala/TableConversions.scala      |  19 +
 .../org/apache/flink/table/api/table.scala      |   4 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |   8 +-
 .../datastream/DataStreamGroupAggregate.scala   |   2 +
 .../DataStreamGroupWindowAggregate.scala        |   4 +
 .../nodes/datastream/retractionTraits.scala     |  37 +-
 .../datastream/DataStreamRetractionRules.scala  |  16 +-
 .../runtime/CRowCorrelateFlatMapRunner.scala    |   2 +-
 .../flink/table/runtime/CRowFlatMapRunner.scala |   2 +-
 .../table/runtime/CRowInputMapRunner.scala      |   2 +-
 .../runtime/CRowInputTupleOutputMapRunner.scala |  53 +-
 .../table/runtime/CRowOutputMapRunner.scala     |   2 +-
 .../table/runtime/CorrelateFlatMapRunner.scala  |   2 +-
 .../flink/table/runtime/FlatJoinRunner.scala    |   2 +-
 .../flink/table/runtime/FlatMapRunner.scala     |   2 +-
 .../flink/table/runtime/MapJoinLeftRunner.scala |   2 +-
 .../table/runtime/MapJoinRightRunner.scala      |   2 +-
 .../apache/flink/table/runtime/MapRunner.scala  |   2 +-
 .../flink/table/runtime/MapSideJoinRunner.scala |   2 +-
 ...aSetSessionWindowAggregatePreProcessor.scala |   2 +-
 .../aggregate/GroupAggProcessFunction.scala     |  55 +-
 .../runtime/io/CRowValuesInputFormat.scala      |   2 +-
 .../table/runtime/io/ValuesInputFormat.scala    |   2 +-
 .../table/sinks/AppendStreamTableSink.scala     |  36 ++
 .../apache/flink/table/sinks/CsvTableSink.scala | 100 +---
 .../table/sinks/RetractStreamTableSink.scala    |  55 ++
 .../flink/table/sinks/StreamRetractSink.scala   |  35 --
 .../flink/table/sinks/StreamTableSink.scala     |  32 --
 .../table/sinks/UpsertStreamTableSink.scala     |  79 +++
 .../flink/table/TableEnvironmentTest.scala      |   1 -
 .../api/scala/stream/RetractionITCase.scala     |  80 ++-
 .../api/scala/stream/TableSinkITCase.scala      |  33 +-
 .../table/api/scala/stream/sql/SqlITCase.scala  | 102 ++--
 .../stream/table/GroupAggregationsITCase.scala  |  61 +--
 .../api/scala/stream/table/OverWindowTest.scala |  18 +-
 .../api/scala/stream/utils/StreamITCase.scala   |  35 +-
 .../table/plan/rules/RetractionRulesTest.scala  |   2 +-
 .../table/sinks/StreamTableSinksITCase.scala    | 511 +++++++++++++++++++
 .../table/utils/MockTableEnvironment.scala      |   5 -
 .../flink/table/utils/TableTestBase.scala       |   4 +-
 47 files changed, 1433 insertions(+), 611 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 97f5fba..a8a2fd0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -29,12 +29,12 @@ import org.apache.flink.util.Preconditions;
 import java.util.Properties;
 
 /**
- * A version-agnostic Kafka {@link StreamTableSink}.
+ * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
  * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
  */
-public abstract class KafkaTableSink implements StreamTableSink<Row> {
+public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
 	protected final String topic;
 	protected final Properties properties;

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c7bacfe..2a3cedf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAtt
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable}
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -133,14 +133,14 @@ abstract class BatchTableEnvironment(
     * Creates a final converter that maps the internal row type to external type.
     *
     * @param physicalTypeInfo the input of the sink
-    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+    * @param schema the input schema with correct field names (esp. for POJO field mapping)
     * @param requestedTypeInfo the output type of the sink
     * @param functionName name of the map function. Must not be unique but has to be a
     *                     valid Java class identifier.
     */
-  override protected def getConversionMapper[IN, OUT](
+  protected def getConversionMapper[IN, OUT](
       physicalTypeInfo: TypeInformation[IN],
-      logicalRowType: RelDataType,
+      schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
       functionName: String):
     Option[MapFunction[IN, OUT]] = {
@@ -153,7 +153,7 @@ abstract class BatchTableEnvironment(
 
       val converterFunction = generateRowConverterFunction[OUT](
         physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
-        logicalRowType,
+        schema,
         requestedTypeInfo,
         functionName
       )
@@ -334,7 +334,11 @@ abstract class BatchTableEnvironment(
       case node: DataSetRel =>
         val plan = node.translateToPlan(this)
         val conversion =
-          getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+          getConversionMapper(
+            plan.getType,
+            new RowSchema(logicalType),
+            tpe,
+            "DataSetSinkConversion")
         conversion match {
           case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
           case Some(mapFunction: MapFunction[Row, A]) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index bd06305..aef2b1b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -19,19 +19,23 @@
 package org.apache.flink.table.api
 
 import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.lang.{Boolean => JBool}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.AtomicType
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.calcite.RelTimeIndicatorConverter
@@ -39,12 +43,12 @@ import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
+import org.apache.flink.table.plan.nodes.datastream._
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.StreamTableSourceTable
-import org.apache.flink.table.plan.schema.DataStreamTable
-import org.apache.flink.table.runtime.{CRowInputMapRunner, CRowInputTupleOutputMapRunner}
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.sinks.{StreamRetractSink, StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TypeCheckUtils
 import org.apache.flink.types.Row
@@ -127,97 +131,192 @@ abstract class StreamTableEnvironment(
   override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
 
     sink match {
-      case streamSink: StreamTableSink[T] =>
+
+      case retractSink: RetractStreamTableSink[_] =>
+        // retraction sink can always be used
         val outputType = sink.getOutputType
         // translate the Table into a DataStream and provide the type that the TableSink expects.
-        val result: DataStream[T] = translate(table)(outputType)
-        // Give the DataSet to the TableSink to emit it.
-        streamSink.emitDataStream(result)
-
-      case streamRetractSink: StreamRetractSink[T] =>
+        val result: DataStream[T] =
+          translate(table, updatesAsRetraction = true, withChangeFlag = true)(outputType)
+        // Give the DataStream to the TableSink to emit it.
+        retractSink.asInstanceOf[RetractStreamTableSink[Any]]
+          .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
+
+      case upsertSink: UpsertStreamTableSink[_] =>
+        // optimize plan
+        val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
+        // check for append only table
+        val isAppendOnlyTable = isAppendOnly(optimizedPlan)
+        upsertSink.setIsAppendOnly(isAppendOnlyTable)
+        // extract unique key fields
+        val tableKeys: Option[Array[String]] = getUniqueKeyFields(optimizedPlan)
+        // check that we have keys if the table has changes (is not append-only)
+        tableKeys match {
+          case Some(keys) => upsertSink.setKeyFields(keys)
+          case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
+          case None if !isAppendOnlyTable => throw new TableException(
+            "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.")
+        }
+        val outputType = sink.getOutputType
+        // translate the Table into a DataStream and provide the type that the TableSink expects.
+        val result: DataStream[T] =
+          translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = true)(outputType)
+        // Give the DataStream to the TableSink to emit it.
+        upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
+          .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
+
+      case appendSink: AppendStreamTableSink[_] =>
+        // optimize plan
+        val optimizedPlan = optimize(table.getRelNode, updatesAsRetraction = false)
+        // verify table is an insert-only (append-only) table
+        if (!isAppendOnly(optimizedPlan)) {
+          throw new TableException(
+            "AppendStreamTableSink requires that Table has only insert changes.")
+        }
         val outputType = sink.getOutputType
-        this.config.setNeedsUpdatesAsRetractionForSink(streamRetractSink.needsUpdatesAsRetraction)
         // translate the Table into a DataStream and provide the type that the TableSink expects.
-        val result: DataStream[JTuple2[Boolean, T]] = translate(table, true)(outputType)
-        // Give the DataSet to the TableSink to emit it.
-        streamRetractSink.emitDataStreamWithChange(result)
+        val result: DataStream[T] =
+          translate(optimizedPlan, table.getRelNode.getRowType, withChangeFlag = false)(outputType)
+        // Give the DataStream to the TableSink to emit it.
+        appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
+
       case _ =>
-        throw new TableException("StreamTableSink required to emit streaming Table")
+        throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, " +
+          "RetractStreamTableSink, or UpsertStreamTableSink.")
     }
   }
 
-
   /**
     * Creates a final converter that maps the internal row type to external type.
     *
     * @param physicalTypeInfo the input of the sink
-    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+    * @param schema the input schema with correct field names (esp. for POJO field mapping)
     * @param requestedTypeInfo the output type of the sink
     * @param functionName name of the map function. Must not be unique but has to be a
     *                     valid Java class identifier.
     */
-  override protected def getConversionMapper[IN, OUT](
+  protected def getConversionMapper[IN, OUT](
       physicalTypeInfo: TypeInformation[IN],
-      logicalRowType: RelDataType,
+      schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
       functionName: String):
-  Option[MapFunction[IN, OUT]] = {
+    MapFunction[IN, OUT] = {
 
-    if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
-      // only used to explain table
-      None
-    } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+    if (requestedTypeInfo.getTypeClass == classOf[Row]) {
       // CRow to Row, only needs to be unwrapped
-      Some(
-        new MapFunction[CRow, Row] {
-          override def map(value: CRow): Row = value.row
-        }.asInstanceOf[MapFunction[IN, OUT]]
-      )
+      new MapFunction[CRow, Row] {
+        override def map(value: CRow): Row = value.row
+      }.asInstanceOf[MapFunction[IN, OUT]]
     } else {
       // Some type that is neither CRow nor Row
       val converterFunction = generateRowConverterFunction[OUT](
         physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
-        logicalRowType,
+        schema,
         requestedTypeInfo,
         functionName
       )
 
-      Some(new CRowInputMapRunner[OUT](
+      new CRowInputMapRunner[OUT](
         converterFunction.name,
         converterFunction.code,
         converterFunction.returnType)
-        .asInstanceOf[MapFunction[IN, OUT]])
+        .asInstanceOf[MapFunction[IN, OUT]]
     }
   }
 
+  /** Validates that the plan produces only append changes. */
+  protected def isAppendOnly(plan: RelNode): Boolean = {
+    val appendOnlyValidator = new AppendOnlyValidator
+    appendOnlyValidator.go(plan)
+
+    appendOnlyValidator.isAppendOnly
+  }
+
+  /** Extracts the unique keys of the table produced by the plan. */
+  protected def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+    val keyExtractor = new UniqueKeyExtractor
+    keyExtractor.go(plan)
+    keyExtractor.keys
+  }
+
   /**
-    * Creates a final converter that maps the internal CRow type to external Tuple2 type.
+    * Creates a converter that maps the internal CRow type to Scala or Java Tuple2 with change flag.
     *
     * @param physicalTypeInfo the input of the sink
-    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
-    * @param requestedTypeInfo the output type of the sink
+    * @param schema the input schema with correct field names (esp. for POJO field mapping)
+    * @param requestedTypeInfo the output type of the sink.
     * @param functionName name of the map function. Must not be unique but has to be a
     *                     valid Java class identifier.
     */
-  protected def getTupleConversionMapper[IN, OUT](
-      physicalTypeInfo: TypeInformation[IN],
-      logicalRowType: RelDataType,
-      requestedTypeInfo: TypeInformation[OUT],
-      functionName: String):
-  Option[MapFunction[IN, JTuple2[Boolean, OUT]]] = {
+  private def getConversionMapperWithChanges[OUT](
+    physicalTypeInfo: TypeInformation[CRow],
+    schema: RowSchema,
+    requestedTypeInfo: TypeInformation[OUT],
+    functionName: String):
+  MapFunction[CRow, OUT] = {
+
+    requestedTypeInfo match {
+
+      // Scala tuple
+      case t: CaseClassTypeInfo[_]
+        if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN =>
+
+        val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]]
+        if (reqType.getTypeClass == classOf[Row]) {
+          // Requested type is Row. Just rewrap CRow in Tuple2
+          new MapFunction[CRow, (Boolean, Row)] {
+            override def map(cRow: CRow): (Boolean, Row) = {
+              (cRow.change, cRow.row)
+            }
+          }.asInstanceOf[MapFunction[CRow, OUT]]
+        } else {
+          // Use a map function to convert Row into requested type and wrap result in Tuple2
+          val converterFunction = generateRowConverterFunction(
+            physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+            schema,
+            reqType,
+            functionName
+          )
+
+          new CRowInputScalaTupleOutputMapRunner(
+            converterFunction.name,
+            converterFunction.code,
+            requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]])
+            .asInstanceOf[MapFunction[CRow, OUT]]
 
-    val converterFunction = generateRowConverterFunction(
-      physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
-      logicalRowType,
-      requestedTypeInfo,
-      functionName
-    )
+        }
 
-    Some(new CRowInputTupleOutputMapRunner[OUT](
-      converterFunction.name,
-      converterFunction.code,
-      new TupleTypeInfo(BasicTypeInfo.BOOLEAN_TYPE_INFO, requestedTypeInfo))
-           .asInstanceOf[MapFunction[IN, JTuple2[Boolean, OUT]]])
+      // Java tuple
+      case t: TupleTypeInfo[_]
+        if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN =>
+
+        val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]]
+        if (reqType.getTypeClass == classOf[Row]) {
+          // Requested type is Row. Just rewrap CRow in Tuple2
+          new MapFunction[CRow, JTuple2[JBool, Row]] {
+            val outT = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row])
+            override def map(cRow: CRow): JTuple2[JBool, Row] = {
+              outT.f0 = cRow.change
+              outT.f1 = cRow.row
+              outT
+            }
+          }.asInstanceOf[MapFunction[CRow, OUT]]
+        } else {
+          // Use a map function to convert Row into requested type and wrap result in Tuple2
+          val converterFunction = generateRowConverterFunction(
+            physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+            schema,
+            reqType,
+            functionName
+          )
+
+          new CRowInputJavaTupleOutputMapRunner(
+            converterFunction.name,
+            converterFunction.code,
+            requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, Any]]])
+            .asInstanceOf[MapFunction[CRow, OUT]]
+        }
+    }
   }
 
   /**
@@ -380,9 +479,10 @@ abstract class StreamTableEnvironment(
     * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
     * @param relNode The root node of the relational expression tree.
+    * @param updatesAsRetraction True if the sink requests updates as retraction messages.
     * @return The optimized [[RelNode]] tree
     */
-  private[flink] def optimize(relNode: RelNode): RelNode = {
+  private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {
 
     // 1. decorrelate
     val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
@@ -410,7 +510,7 @@ abstract class StreamTableEnvironment(
     // 5. optimize the physical Flink plan
     val physicalOptRuleSet = getPhysicalOptRuleSet
     val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
-    var physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
+    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
       runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
     } else {
       logicalPlan
@@ -419,13 +519,18 @@ abstract class StreamTableEnvironment(
     // 6. decorate the optimized plan
     val decoRuleSet = getDecoRuleSet
     val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
-
-      if (this.config.getNeedsUpdatesAsRetractionForSink) {
-        physicalPlan = physicalPlan.copy(
+      val planToDecorate = if (updatesAsRetraction) {
+        physicalPlan.copy(
           physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
           physicalPlan.getInputs)
+      } else {
+        physicalPlan
       }
-      runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet)
+      runHepPlanner(
+        HepMatchOrder.BOTTOM_UP,
+        decoRuleSet,
+        planToDecorate,
+        planToDecorate.getTraitSet)
     } else {
       physicalPlan
     }
@@ -440,14 +545,17 @@ abstract class StreamTableEnvironment(
     * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
     *
     * @param table The root node of the relational expression tree.
+    * @param updatesAsRetraction Set to true to encode updates as retraction messages.
+    * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+  protected def translate[A](table: Table, updatesAsRetraction: Boolean, withChangeFlag: Boolean)
+      (implicit tpe: TypeInformation[A]): DataStream[A] = {
     val relNode = table.getRelNode
-    val dataStreamPlan = optimize(relNode)
-    translate(dataStreamPlan, relNode.getRowType)
+    val dataStreamPlan = optimize(relNode, updatesAsRetraction)
+    translate(dataStreamPlan, relNode.getRowType, withChangeFlag)
   }
 
   /**
@@ -456,87 +564,65 @@ abstract class StreamTableEnvironment(
     * @param logicalPlan The root node of the relational expression tree.
     * @param logicalType The row type of the result. Since the logicalPlan can lose the
     *                    field naming during optimization we pass the row type separately.
+    * @param withChangeFlag Set to true to emit records with change flags.
     * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
     * @tparam A The type of the resulting [[DataStream]].
     * @return The [[DataStream]] that corresponds to the translated [[Table]].
     */
   protected def translate[A](
       logicalPlan: RelNode,
-      logicalType: RelDataType)
+      logicalType: RelDataType,
+      withChangeFlag: Boolean)
       (implicit tpe: TypeInformation[A]): DataStream[A] = {
 
-    TableEnvironment.validateType(tpe)
+    // if no change flags are requested, verify table is an insert-only (append-only) table.
+    if (!withChangeFlag && !isAppendOnly(logicalPlan)) {
+      throw new TableException(
+        "Table is not an append-only table. " +
+          "Output needs to handle update and delete changes.")
+    }
 
-    logicalPlan match {
-      case node: DataStreamRel =>
-        val plan = node.translateToPlan(this)
-        val conversion =
-          getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
-        conversion match {
-          case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
-          case Some(mapFunction: MapFunction[CRow, A]) =>
-            plan.map(mapFunction)
-              .returns(tpe)
-              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
-              .asInstanceOf[DataStream[A]]
-        }
+    // get CRow plan
+    val plan: DataStream[CRow] = translateToCRow(logicalPlan)
 
-      case _ =>
-        throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
-          "This is a bug and should not happen. Please file an issue.")
+    // convert CRow to output type
+    val conversion = if (withChangeFlag) {
+      getConversionMapperWithChanges(
+        plan.getType,
+        new RowSchema(logicalType),
+        tpe,
+        "DataStreamSinkConversion")
+    } else {
+      getConversionMapper(
+        plan.getType,
+        new RowSchema(logicalType),
+        tpe,
+        "DataStreamSinkConversion")
     }
-  }
 
-  /**
-    * Translates a [[Table]] into a [[DataStream]] with change information.
-    *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
-    *
-    * @param table       The root node of the relational expression tree.
-    * @param wrapToTuple True, if want to output chang information
-    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
-    * @tparam A The type of the resulting [[DataStream]].
-    * @return The [[DataStream]] that corresponds to the translated [[Table]].
-    */
-  protected def translate[A](table: Table, wrapToTuple: Boolean)(implicit tpe: TypeInformation[A])
-  : DataStream[JTuple2[Boolean, A]] = {
-    val relNode = table.getRelNode
-    val dataStreamPlan = optimize(relNode)
-    translate(dataStreamPlan, relNode.getRowType, wrapToTuple)
+    val rootParallelism = plan.getParallelism
+
+    conversion match {
+      case mapFunction: MapFunction[CRow, A] =>
+        plan.map(mapFunction)
+          .returns(tpe)
+          .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+          .setParallelism(rootParallelism)
+    }
   }
 
   /**
-    * Translates a logical [[RelNode]] into a [[DataStream]] with change information.
+    * Translates a logical [[RelNode]] plan into a [[DataStream]] of type [[CRow]].
     *
-    * @param logicalPlan The root node of the relational expression tree.
-    * @param logicalType The row type of the result. Since the logicalPlan can lose the
-    * @param wrapToTuple True, if want to output chang information
-    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
-    * @tparam A The type of the resulting [[DataStream]].
-    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    * @param logicalPlan The logical plan to translate.
+    * @return The [[DataStream]] of type [[CRow]].
     */
-  protected def translate[A](
-      logicalPlan: RelNode,
-      logicalType: RelDataType,
-      wrapToTuple: Boolean)(implicit tpe: TypeInformation[A]): DataStream[JTuple2[Boolean, A]] = {
-
-    TableEnvironment.validateType(tpe)
+  protected def translateToCRow(
+    logicalPlan: RelNode): DataStream[CRow] = {
 
     logicalPlan match {
       case node: DataStreamRel =>
-        val plan = node.translateToPlan(this)
-        val conversion =
-          getTupleConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
-        conversion match {
-          case None => plan.asInstanceOf[DataStream[JTuple2[Boolean, A]]] // no conversion necessary
-          case Some(mapFunction: MapFunction[CRow, JTuple2[Boolean, A]]) =>
-            plan.map(mapFunction)
-              .returns(new TupleTypeInfo[JTuple2[Boolean, A]](BasicTypeInfo.BOOLEAN_TYPE_INFO, tpe))
-              .name(s"to: ${tpe.getTypeClass.getSimpleName}")
-              .asInstanceOf[DataStream[JTuple2[Boolean, A]]]
-        }
-
+        node.translateToPlan(this)
       case _ =>
         throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
           "This is a bug and should not happen. Please file an issue.")
@@ -551,10 +637,8 @@ abstract class StreamTableEnvironment(
     */
   def explain(table: Table): String = {
     val ast = table.getRelNode
-    val optimizedPlan = optimize(ast)
-    val dataStream = translate[CRow](
-      optimizedPlan,
-      ast.getRowType)(new GenericTypeInfo(classOf[CRow]))
+    val optimizedPlan = optimize(ast, updatesAsRetraction = false)
+    val dataStream = translateToCRow(optimizedPlan)
 
     val env = dataStream.getExecutionEnvironment
     val jsonSqlPlan = env.getExecutionPlan
@@ -574,4 +658,90 @@ abstract class StreamTableEnvironment(
         s"$sqlPlan"
   }
 
+  private class AppendOnlyValidator extends RelVisitor {
+
+    var isAppendOnly = true
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case s: DataStreamRel if s.producesUpdates =>
+          isAppendOnly = false
+        case _ =>
+          super.visit(node, ordinal, parent)
+      }
+    }
+  }
+
+  /** Identifies unique key fields in the output of a RelNode. */
+  private class UniqueKeyExtractor extends RelVisitor {
+
+    var keys: Option[Array[String]] = None
+
+    override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+      node match {
+        case c: DataStreamCalc =>
+          super.visit(node, ordinal, parent)
+          // check if input has keys
+          if (keys.isDefined) {
+            // track keys forward
+            val inNames = c.getInput.getRowType.getFieldNames
+            val inOutNames = c.getProgram.getNamedProjects.asScala
+              .map(p => {
+                c.getProgram.expandLocalRef(p.left) match {
+                    // output field is forwarded input field
+                  case i: RexInputRef => (i.getIndex, p.right)
+                    // output field is renamed input field
+                  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+                    a.getOperands.get(0) match {
+                      case ref: RexInputRef =>
+                        (ref.getIndex, p.right)
+                      case _ =>
+                        (-1, p.right)
+                    }
+                    // output field is not forwarded from input
+                  case _: RexNode => (-1, p.right)
+                }
+              })
+              // filter all non-forwarded fields
+              .filter(_._1 >= 0)
+              // resolve names of input fields
+              .map(io => (inNames.get(io._1), io._2))
+
+            // filter by input keys
+            val outKeys = inOutNames.filter(io => keys.get.contains(io._1)).map(_._2)
+            // check if all keys have been preserved
+            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
+              // all key have been preserved (but possibly renamed)
+              keys = Some(outKeys.toArray)
+            } else {
+              // some (or all) keys have been removed. Keys are no longer unique and removed
+              keys = None
+            }
+          }
+        case _: DataStreamOverAggregate =>
+          super.visit(node, ordinal, parent)
+          // keys are always forwarded by Over aggregate
+        case a: DataStreamGroupAggregate =>
+          // get grouping keys
+          val groupKeys = a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
+          keys = Some(groupKeys.toArray)
+        case w: DataStreamGroupWindowAggregate =>
+          // get grouping keys
+          val groupKeys =
+            w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
+          // get window start and end time
+          val windowStartEnd = w.getWindowProperties.map(_.name)
+          // we have only a unique key if at least one window property is selected
+          if (windowStartEnd.nonEmpty) {
+            keys = Some(groupKeys ++ windowStartEnd)
+          }
+        case _: DataStreamRel =>
+          // anything else does not forward keys or might duplicate key, so we can stop
+          keys = None
+      }
+    }
+
+  }
+
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index d296978..6448657 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,11 +37,6 @@ class TableConfig {
   private var nullCheck: Boolean = true
 
   /**
-    * Defines whether sink table requires that update and delete changes are sent with retraction
-    */
-  private var needsUpdatesAsRetractionForSink: Boolean = false
-
-  /**
     * Defines the configuration of Calcite for Table API and SQL queries.
     */
   private var calciteConfig = CalciteConfig.DEFAULT
@@ -72,18 +67,6 @@ class TableConfig {
   }
 
   /**
-    * Returns the need retraction property for table sink.
-    */
-  def getNeedsUpdatesAsRetractionForSink = needsUpdatesAsRetractionForSink
-
-  /**
-    * Set the need retraction property for table sink.
-    */
-  def setNeedsUpdatesAsRetractionForSink(needsUpdatesAsRetraction: Boolean ): Unit = {
-    this.needsUpdatesAsRetractionForSink = needsUpdatesAsRetraction
-  }
-
-  /**
     * Returns the current configuration of Calcite for Table API and SQL queries.
     */
   def getCalciteConfig: CalciteConfig = calciteConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 5b752ab..bb0de3e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -51,15 +51,14 @@ import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction}
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction}
+import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.RelTable
-import org.apache.flink.table.runtime.types.CRowTypeInfo
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
@@ -620,7 +619,7 @@ abstract class TableEnvironment(val config: TableConfig) {
         throw new TableException(
           "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
             "Please specify the type of the input with a RowTypeInfo.")
-      case a: AtomicType[A] =>
+      case a: AtomicType[_] =>
         exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>
             if (idx > 0) {
@@ -691,53 +690,32 @@ abstract class TableEnvironment(val config: TableConfig) {
     (fieldNames.toArray, fieldIndexes.toArray)
   }
 
-  /**
-    * Creates a final converter that maps the internal row type to external type.
-    *
-    * @param physicalTypeInfo the input of the sink
-    * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
-    * @param requestedTypeInfo the output type of the sink
-    * @param functionName name of the map function. Must not be unique but has to be a
-    *                     valid Java class identifier.
-    */
-  protected def getConversionMapper[IN, OUT](
-      physicalTypeInfo: TypeInformation[IN],
-      logicalRowType: RelDataType,
-      requestedTypeInfo: TypeInformation[OUT],
-      functionName: String):
-    Option[MapFunction[IN, OUT]]
-
   protected def generateRowConverterFunction[OUT](
       inputTypeInfo: TypeInformation[Row],
-      logicalRowType: RelDataType,
+      schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
       functionName: String):
     GeneratedFunction[MapFunction[Row, OUT], OUT] = {
 
     // validate that at least the field types of physical and logical type match
     // we do that here to make sure that plan translation was correct
-    val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
-    if (logicalRowTypeInfo != inputTypeInfo) {
+    if (schema.physicalTypeInfo != inputTypeInfo) {
       throw TableException("The field types of physical and logical row types do not match." +
         "This is a bug and should not happen. Please file an issue.")
     }
 
-    // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList.asScala
-      .map(t => FlinkTypeFactory.toTypeInfo(t.getType))
-
-    // field names
-    val logicalFieldNames = logicalRowType.getFieldNames.asScala
+    val fieldTypes = schema.physicalFieldTypeInfo
+    val fieldNames = schema.physicalFieldNames
 
     // validate requested type
-    if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
+    if (requestedTypeInfo.getArity != fieldTypes.length) {
       throw new TableException("Arity of result does not match requested type.")
     }
 
     requestedTypeInfo match {
       // POJO type requested
       case pt: PojoTypeInfo[_] =>
-        logicalFieldNames.zip(logicalFieldTypes) foreach {
+        fieldNames.zip(fieldTypes) foreach {
           case (fName, fType) =>
             val pojoIdx = pt.getFieldIndex(fName)
             if (pojoIdx < 0) {
@@ -752,7 +730,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 
       // Tuple/Case class/Row type requested
       case tt: TupleTypeInfoBase[_] =>
-        logicalFieldTypes.zipWithIndex foreach {
+        fieldTypes.zipWithIndex foreach {
           case (fieldTypeInfo, i) =>
             val requestedTypeInfo = tt.getTypeAt(i)
             if (fieldTypeInfo != requestedTypeInfo) {
@@ -763,11 +741,11 @@ abstract class TableEnvironment(val config: TableConfig) {
 
       // Atomic type requested
       case at: AtomicType[_] =>
-        if (logicalFieldTypes.size != 1) {
+        if (fieldTypes.size != 1) {
           throw new TableException(s"Requested result type is an atomic type but " +
             s"result has more or less than a single field.")
         }
-        val fieldTypeInfo = logicalFieldTypes.head
+        val fieldTypeInfo = fieldTypes.head
         if (fieldTypeInfo != at) {
           throw new TableException(s"Result field does not match requested type. " +
             s"Requested: $at; Actual: $fieldTypeInfo")
@@ -787,7 +765,7 @@ abstract class TableEnvironment(val config: TableConfig) {
 
     val conversion = generator.generateConverterResultExpression(
       requestedTypeInfo,
-      logicalFieldNames)
+      fieldNames)
 
     val body =
       s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index a649584..a70bcca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -18,12 +18,14 @@
 package org.apache.flink.table.api.java
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.table.api._
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import _root_.java.lang.{Boolean => JBool}
 
 /**
   * The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
@@ -132,7 +134,10 @@ class StreamTableEnvironment(
   }
 
   /**
-    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
     * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
@@ -145,11 +150,16 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+    val typeInfo = TypeExtractor.createTypeInfo(clazz)
+    TableEnvironment.validateType(typeInfo)
+    translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
   }
 
   /**
-    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
     * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
@@ -162,7 +172,68 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
-    translate[T](table)(typeInfo)
+    TableEnvironment.validateType(typeInfo)
+    translate[T](table, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    * The fields of the [[Table]] are mapped to the requested type as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the requested record type.
+    * @tparam T The type of the requested record type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T](table: Table, clazz: Class[T]):
+    DataStream[JTuple2[JBool, T]] = {
+
+    val typeInfo = TypeExtractor.createTypeInfo(clazz)
+    TableEnvironment.validateType(typeInfo)
+    val resultType = new TupleTypeInfo[JTuple2[JBool, T]](Types.BOOLEAN, typeInfo)
+    translate[JTuple2[JBool, T]](
+      table,
+      updatesAsRetraction = true,
+      withChangeFlag = true)(resultType)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[JTuple2]]. The first field is a [[JBool]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[JBool]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    * The fields of the [[Table]] are mapped to the requested type as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] of the requested record type.
+    * @tparam T The type of the requested record type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T](table: Table, typeInfo: TypeInformation[T]):
+    DataStream[JTuple2[JBool, T]] = {
+
+    TableEnvironment.validateType(typeInfo)
+    val resultTypeInfo = new TupleTypeInfo[JTuple2[JBool, T]](
+      Types.BOOLEAN,
+      typeInfo
+    )
+    translate[JTuple2[JBool, T]](
+      table,
+      updatesAsRetraction = true,
+      withChangeFlag = true)(resultTypeInfo)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 0552d7c..e5ad6c2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableEnvironment, Table, TableConfig}
-import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.asScalaStream
 
@@ -127,11 +128,14 @@ class StreamTableEnvironment(
   }
 
   /**
-    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    * Converts the given [[Table]] into an append [[DataStream]] of a specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified
+    * by update or delete changes, the conversion will fail.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are mapped by position, field
+    * types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
     * @param table The [[Table]] to convert.
@@ -139,7 +143,24 @@ class StreamTableEnvironment(
     * @return The converted [[DataStream]].
     */
   def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
-    asScalaStream(translate(table))
+    val returnType = createTypeInformation[T]
+    asScalaStream(translate(table, updatesAsRetraction = false, withChangeFlag = false)(returnType))
+  }
+
+/**
+  * Converts the given [[Table]] into a [[DataStream]] of add and retract messages.
+  * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+  * the second field holds the record of the specified type [[T]].
+  *
+  * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+  *
+  * @param table The [[Table]] to convert.
+  * @tparam T The type of the requested data type.
+  * @return The converted [[DataStream]].
+  */
+  def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = {
+    val returnType = createTypeInformation[(Boolean, T)]
+    asScalaStream(translate(table, updatesAsRetraction = true, withChangeFlag = true)(returnType))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 2a0d571..5efff62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -57,5 +57,24 @@ class TableConversions(table: Table) {
     }
   }
 
+  /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates a retract message.
+    *
+    */
+  def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
+
+    table.tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toRetractStream(table)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index dd8265b..310a75f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -755,7 +755,9 @@ class Table(
     *
     * A batch [[Table]] can only be written to a
     * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
-    * [[org.apache.flink.table.sinks.StreamTableSink]].
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
     *
     * @param sink The [[TableSink]] to which the [[Table]] is written.
     * @tparam T The data type that the [[TableSink]] expects.

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 59f723ac..ce0f966 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -100,11 +100,17 @@ class DataStreamCalc(
       calcProgram,
       config)
 
+    val inputParallelism = inputDataStream.getParallelism
+
     val mapFunc = new CRowFlatMapRunner(
       genFunction.name,
       genFunction.code,
       CRowTypeInfo(schema.physicalTypeInfo))
 
-    inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+    inputDataStream
+      .flatMap(mapFunc)
+      .name(calcOpName(calcProgram, getExpressionString))
+      // keep parallelism to ensure order of accumulate and retract messages
+      .setParallelism(inputParallelism)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 056cda9..18f1fc8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -67,6 +67,8 @@ class DataStreamGroupAggregate(
 
   override def consumesRetractions = true
 
+  def getGroupings: Array[Int] = groupings
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamGroupAggregate(
       cluster,

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index f61828b..1be1896 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -59,6 +59,10 @@ class DataStreamGroupWindowAggregate(
 
   override def consumesRetractions = true
 
+  def getGroupings: Array[Int] = grouping
+
+  def getWindowProperties: Seq[NamedWindowProperty] = namedProperties
+
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamGroupWindowAggregate(
       window,

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
index c3b43ba..173b7d3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/retractionTraits.scala
@@ -82,19 +82,38 @@ object AccModeTrait {
 }
 
 /**
-  * The AccMode indicates which kinds of messages a [[org.apache.calcite.rel.RelNode]] might
-  * produce.
-  * In [[AccMode.Acc]] the node only emit accumulate messages.
-  * In [[AccMode.AccRetract]], the node produces accumulate messages for insert changes,
-  * retraction messages for delete changes, and accumulate and retraction messages
-  * for update changes.
+  * The [[AccMode]] determines how insert, update, and delete changes of tables are encoded
+  * by the messeages that an operator emits.
   */
 object AccMode extends Enumeration {
   type AccMode = Value
 
-  val Acc        = Value // Operator produces only accumulate (insert) messages
-  val AccRetract = Value // Operator produces accumulate (insert, update) and
-                         //   retraction (delete, update) messages
+  /**
+    * An operator in [[Acc]] mode emits change messages as
+    * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row).
+    *
+    * An operator in [[Acc]] mode may only produce update and delete messages, if the table has
+    * a unique key and all key attributes are contained in the Row.
+    *
+    * Changes are encoded as follows:
+    * - insert: (true, NewRow)
+    * - update: (true, NewRow) // the Row includes the full unique key to identify the row to update
+    * - delete: (false, OldRow) // the Row includes the full unique key to idenify the row to delete
+    *
+    */
+  val Acc = Value
+
+  /**
+    * * An operator in [[AccRetract]] mode emits change messages as
+    * [[org.apache.flink.table.runtime.types.CRow]] which encode a pair of (Boolean, Row).
+    *
+    * Changes are encoded as follows:
+    * - insert: (true, NewRow)
+    * - update: (false, OldRow), (true, NewRow) // updates are encoded in two messages!
+    * - delete: (false, OldRow)
+    *
+    */
+  val AccRetract = Value
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index 97c0dbb..f0b725d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -102,17 +102,17 @@ object DataStreamRetractionRules {
       val rel = call.rel(0).asInstanceOf[DataStreamRel]
       val traits = rel.getTraitSet
 
-      val traitsWithUpdateAsRetrac =
+      val traitsWithUpdateAsRetraction =
         if (null == traits.getTrait(UpdateAsRetractionTraitDef.INSTANCE)) {
         traits.plus(UpdateAsRetractionTrait.DEFAULT)
       } else {
         traits
       }
       val traitsWithAccMode =
-        if (null == traitsWithUpdateAsRetrac.getTrait(AccModeTraitDef.INSTANCE)) {
-          traitsWithUpdateAsRetrac.plus(AccModeTrait.DEFAULT)
+        if (null == traitsWithUpdateAsRetraction.getTrait(AccModeTraitDef.INSTANCE)) {
+          traitsWithUpdateAsRetraction.plus(AccModeTrait.DEFAULT)
       } else {
-        traitsWithUpdateAsRetrac
+        traitsWithUpdateAsRetraction
       }
 
       if (traits != traitsWithAccMode) {
@@ -122,8 +122,8 @@ object DataStreamRetractionRules {
   }
 
   /**
-    * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update and delete
-    * changes as retraction messages.
+    * Rule that annotates all [[DataStreamRel]] nodes that need to sent out update changes with
+    * retraction messages.
     */
   class SetUpdatesAsRetractionRule extends RelOptRule(
     operand(
@@ -131,7 +131,7 @@ object DataStreamRetractionRules {
     "SetUpdatesAsRetractionRule") {
 
     /**
-      * Checks if a [[RelNode]] requires that update and delete changes are sent with retraction
+      * Checks if a [[RelNode]] requires that update changes are sent with retraction
       * messages.
       */
     def needsUpdatesAsRetraction(node: RelNode): Boolean = {
@@ -142,7 +142,7 @@ object DataStreamRetractionRules {
     }
 
     /**
-      * Annotates a [[RelNode]] to send out update and delete changes as retraction messages.
+      * Annotates a [[RelNode]] to send out update changes with retraction messages.
       */
     def setUpdatesAsRetraction(relNode: RelNode): RelNode = {
       val traitSet = relNode.getTraitSet

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
index 66e51b1..ff3821a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
@@ -37,7 +37,7 @@ class CRowCorrelateFlatMapRunner(
     flatMapCode: String,
     collectorName: String,
     collectorCode: String,
-    @transient returnType: TypeInformation[CRow])
+    @transient var returnType: TypeInformation[CRow])
   extends RichFlatMapFunction[CRow, CRow]
   with ResultTypeQueryable[CRow]
   with Compiler[Any] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
index 9a4650b..9701cb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory
 class CRowFlatMapRunner(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[CRow])
+    @transient var returnType: TypeInformation[CRow])
   extends RichFlatMapFunction[CRow, CRow]
   with ResultTypeQueryable[CRow]
   with Compiler[FlatMapFunction[Row, Row]] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
index 8e95c93..109c6e1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory
 class CRowInputMapRunner[OUT](
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
+    @transient var returnType: TypeInformation[OUT])
   extends RichMapFunction[CRow, OUT]
   with ResultTypeQueryable[OUT]
   with Compiler[MapFunction[Row, OUT]] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
index 54bbf7e..7c96437 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.runtime
 
+import java.lang.{Boolean => JBool}
+
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
@@ -28,36 +30,63 @@ import org.apache.flink.types.Row
 import org.slf4j.LoggerFactory
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 
-
 /**
-  * Convert [[CRow]] to a [[Tuple2]]
+  * Convert [[CRow]] to a [[JTuple2]]
   */
-class CRowInputTupleOutputMapRunner[OUT](
+class CRowInputJavaTupleOutputMapRunner(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[JTuple2[Boolean, OUT]])
-  extends RichMapFunction[CRow, JTuple2[Boolean, OUT]]
-          with ResultTypeQueryable[JTuple2[Boolean, OUT]]
-          with Compiler[MapFunction[Row, OUT]] {
+    @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
+  extends RichMapFunction[CRow, Any]
+          with ResultTypeQueryable[JTuple2[JBool, Any]]
+          with Compiler[MapFunction[Row, Any]] {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  private var function: MapFunction[Row, OUT] = _
-  private var tupleWrapper: JTuple2[Boolean, OUT] = _
+  private var function: MapFunction[Row, Any] = _
+  private var tupleWrapper: JTuple2[JBool, Any] = _
 
   override def open(parameters: Configuration): Unit = {
     LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
     val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
     LOG.debug("Instantiating MapFunction.")
     function = clazz.newInstance()
-    tupleWrapper = new JTuple2[Boolean, OUT]()
+    tupleWrapper = new JTuple2[JBool, Any]()
   }
 
-  override def map(in: CRow): JTuple2[Boolean, OUT] = {
+  override def map(in: CRow): JTuple2[JBool, Any] = {
     tupleWrapper.f0 = in.change
     tupleWrapper.f1 = function.map(in.row)
     tupleWrapper
   }
 
-  override def getProducedType: TypeInformation[JTuple2[Boolean, OUT]] = returnType
+  override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType
+}
+
+/**
+  * Convert [[CRow]] to a [[Tuple2]]
+  */
+class CRowInputScalaTupleOutputMapRunner(
+  name: String,
+  code: String,
+  @transient var returnType: TypeInformation[(Boolean, Any)])
+  extends RichMapFunction[CRow, (Boolean, Any)]
+    with ResultTypeQueryable[(Boolean, Any)]
+    with Compiler[MapFunction[Row, Any]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, Any] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: CRow): (Boolean, Any) =
+    (in.change, function.map(in.row))
+
+  override def getProducedType: TypeInformation[(Boolean, Any)] = returnType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
index 966dea9..cb8f695 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory
 class CRowOutputMapRunner(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[CRow])
+    @transient var returnType: TypeInformation[CRow])
   extends RichMapFunction[Any, CRow]
   with ResultTypeQueryable[CRow]
   with Compiler[MapFunction[Any, Row]] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
index a0415e1..478b6b6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala
@@ -32,7 +32,7 @@ class CorrelateFlatMapRunner[IN, OUT](
     flatMapCode: String,
     collectorName: String,
     collectorCode: String,
-    @transient returnType: TypeInformation[OUT])
+    @transient var returnType: TypeInformation[OUT])
   extends RichFlatMapFunction[IN, OUT]
   with ResultTypeQueryable[OUT]
   with Compiler[Any] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
index 715848d..67acc0b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory
 class FlatJoinRunner[IN1, IN2, OUT](
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
+    @transient var returnType: TypeInformation[OUT])
   extends RichFlatJoinFunction[IN1, IN2, OUT]
   with ResultTypeQueryable[OUT]
   with Compiler[FlatJoinFunction[IN1, IN2, OUT]] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
index 2e37baf..938da59 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory
 class FlatMapRunner(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[Row])
+    @transient var returnType: TypeInformation[Row])
   extends RichFlatMapFunction[Row, Row]
   with ResultTypeQueryable[Row]
   with Compiler[FlatMapFunction[Row, Row]] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
index 644e855..5f3dbb4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
@@ -24,7 +24,7 @@ import org.apache.flink.util.Collector
 class MapJoinLeftRunner[IN1, IN2, OUT](
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT],
+    returnType: TypeInformation[OUT],
     broadcastSetName: String)
   extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code, returnType, broadcastSetName) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
index eee38d1..e2d9331 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinRightRunner.scala
@@ -24,7 +24,7 @@ import org.apache.flink.util.Collector
 class MapJoinRightRunner[IN1, IN2, OUT](
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT],
+    returnType: TypeInformation[OUT],
     broadcastSetName: String)
   extends MapSideJoinRunner[IN1, IN2, IN1, IN2, OUT](name, code, returnType, broadcastSetName) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
index 32562c7..14eeecf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory
 class MapRunner[IN, OUT](
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT])
+    @transient var returnType: TypeInformation[OUT])
   extends RichMapFunction[IN, OUT]
   with ResultTypeQueryable[OUT]
   with Compiler[MapFunction[IN, OUT]] {

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
index 090e184..00b7b8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory
 abstract class MapSideJoinRunner[IN1, IN2, SINGLE_IN, MULTI_IN, OUT](
     name: String,
     code: String,
-    @transient returnType: TypeInformation[OUT],
+    @transient var returnType: TypeInformation[OUT],
     broadcastSetName: String)
   extends RichFlatMapFunction[MULTI_IN, OUT]
     with ResultTypeQueryable[OUT]

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
index 22a2682..9bcac30 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
@@ -41,7 +41,7 @@ class DataSetSessionWindowAggregatePreProcessor(
     genAggregations: GeneratedAggregationsFunction,
     keysAndAggregatesArity: Int,
     gap: Long,
-    @transient intermediateRowType: TypeInformation[Row])
+    @transient var intermediateRowType: TypeInformation[Row])
   extends AbstractRichFunction
   with MapPartitionFunction[Row,Row]
   with GroupCombineFunction[Row,Row]

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 745f24d..6ee37e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
+import java.lang.{Long => JLong}
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.types.Row
@@ -24,6 +26,7 @@ import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state.ValueStateDescriptor
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.slf4j.LoggerFactory
 import org.apache.flink.table.runtime.types.CRow
@@ -47,7 +50,10 @@ class GroupAggProcessFunction(
   private var newRow: CRow = _
   private var prevRow: CRow = _
   private var firstRow: Boolean = _
+  // stores the accumulators
   private var state: ValueState[Row] = _
+  // counts the number of added and retracted input records
+  private var cntState: ValueState[JLong] = _
 
   override def open(config: Configuration) {
     LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
@@ -65,6 +71,9 @@ class GroupAggProcessFunction(
     val stateDescriptor: ValueStateDescriptor[Row] =
       new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
     state = getRuntimeContext.getState(stateDescriptor)
+    val inputCntDescriptor: ValueStateDescriptor[JLong] =
+      new ValueStateDescriptor[JLong]("GroupAggregateInputCounter", Types.LONG)
+    cntState = getRuntimeContext.getState(inputCntDescriptor)
   }
 
   override def processElement(
@@ -74,11 +83,14 @@ class GroupAggProcessFunction(
 
     val input = inputC.row
 
-    // get accumulators
+    // get accumulators and input counter
     var accumulators = state.value()
+    var inputCnt = cntState.value()
+
     if (null == accumulators) {
       firstRow = true
       accumulators = function.createAccumulators()
+      inputCnt = 0L
     } else {
       firstRow = false
     }
@@ -92,29 +104,44 @@ class GroupAggProcessFunction(
 
     // update aggregate result and set to the newRow
     if (inputC.change) {
+      inputCnt += 1
       // accumulate input
       function.accumulate(accumulators, input)
       function.setAggregationResults(accumulators, newRow.row)
     } else {
+      inputCnt -= 1
       // retract input
       function.retract(accumulators, input)
       function.setAggregationResults(accumulators, newRow.row)
     }
 
-    // update accumulators
-    state.update(accumulators)
-
-    // if previousRow is not null, do retraction process
-    if (generateRetraction && !firstRow) {
-      if (prevRow.row.equals(newRow.row)) {
-        // ignore same newRow
-        return
-      } else {
-        // retract previous row
-        out.collect(prevRow)
+    if (inputCnt != 0) {
+      // we aggregated at least one record for this key
+
+      // update the state
+      state.update(accumulators)
+      cntState.update(inputCnt)
+
+      // if this was not the first row and we have to emit retractions
+      if (generateRetraction && !firstRow) {
+        if (prevRow.row.equals(newRow.row)) {
+          // newRow is the same as before. Do not emit retraction and acc messages
+          return
+        } else {
+          // retract previous result
+          out.collect(prevRow)
+        }
       }
-    }
+      // emit the new result
+      out.collect(newRow)
 
-    out.collect(newRow)
+    } else {
+      // we retracted the last record for this key
+      // sent out a delete message
+      out.collect(prevRow)
+      // and clear all state
+      state.clear()
+      cntState.clear()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
index ec73fa6..1cb3a6e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory
 class CRowValuesInputFormat(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[CRow])
+    @transient var returnType: TypeInformation[CRow])
   extends GenericInputFormat[CRow]
   with NonParallelInput
   with ResultTypeQueryable[CRow]

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
index d536b39..43ce605 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory
 class ValuesInputFormat(
     name: String,
     code: String,
-    @transient returnType: TypeInformation[Row])
+    @transient var returnType: TypeInformation[Row])
   extends GenericInputFormat[Row]
   with NonParallelInput
   with ResultTypeQueryable[Row]

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
new file mode 100644
index 0000000..abdca17
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/**
+  * Defines an external [[TableSink]] to emit streaming [[Table]] with only insert changes.
+  *
+  * If the [[Table]] is also modified by update or delete changes, a
+  * [[org.apache.flink.table.api.TableException]] will be thrown.
+  *
+  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
+  */
+trait AppendStreamTableSink[T] extends TableSink[T] {
+
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[T]): Unit
+}