You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/20 07:31:36 UTC

[2/3] flink git commit: [FLINK-9852] [table] Expose descriptor-based sink creation and introduce update mode

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
new file mode 100644
index 0000000..ec57c5e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.factories._
+import org.apache.flink.table.plan.schema._
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.{BatchTableSink, StreamTableSink}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
+import org.apache.flink.table.util.Logging
+
+
+/**
+  * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]].
+  *
+  * It uses [[TableFactoryService]] for discovering.
+  */
+object ExternalTableUtil extends Logging {
+
+  /**
+    * Converts an [[ExternalCatalogTable]] instance to a [[TableSourceTable]] instance
+    *
+    * @param externalTable the [[ExternalCatalogTable]] instance which to convert
+    * @return converted [[TableSourceTable]] instance from the input catalog table
+    */
+  def fromExternalCatalogTable[T1, T2](
+      tableEnv: TableEnvironment,
+      externalTable: ExternalCatalogTable)
+    : TableSourceSinkTable[T1, T2] = {
+
+    val statistics = new FlinkStatistic(externalTable.getTableStats)
+
+    val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) {
+      Some(createTableSource(tableEnv, externalTable, statistics))
+    } else {
+      None
+    }
+
+    val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
+      Some(createTableSink(tableEnv, externalTable, statistics))
+    } else {
+      None
+    }
+
+    new TableSourceSinkTable[T1, T2](source, sink)
+  }
+
+  private def createTableSource[T](
+      tableEnv: TableEnvironment,
+      externalTable: ExternalCatalogTable,
+      statistics: FlinkStatistic)
+    : TableSourceTable[T] = tableEnv match {
+
+    case _: BatchTableEnvironment if externalTable.isBatchTable =>
+      val source = TableFactoryUtil.findAndCreateTableSource(tableEnv, externalTable)
+      new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics)
+
+    case _: StreamTableEnvironment if externalTable.isStreamTable =>
+      val source = TableFactoryUtil.findAndCreateTableSource(tableEnv, externalTable)
+      new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics)
+
+    case _ =>
+      throw new ValidationException(
+        "External catalog table does not support the current environment for a table source.")
+  }
+
+  private def createTableSink[T](
+      tableEnv: TableEnvironment,
+      externalTable: ExternalCatalogTable,
+      statistics: FlinkStatistic)
+    : TableSinkTable[T] = tableEnv match {
+
+    case _: BatchTableEnvironment if externalTable.isBatchTable =>
+      val sink = TableFactoryUtil.findAndCreateTableSink(tableEnv, externalTable)
+      new TableSinkTable[T](sink.asInstanceOf[BatchTableSink[T]], statistics)
+
+    case _: StreamTableEnvironment if externalTable.isStreamTable =>
+      val sink = TableFactoryUtil.findAndCreateTableSink(tableEnv, externalTable)
+      new TableSinkTable[T](sink.asInstanceOf[StreamTableSink[T]], statistics)
+
+    case _ =>
+      throw new ValidationException(
+        "External catalog table does not support the current environment for a table sink.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
new file mode 100644
index 0000000..6bd2a71
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableDescriptor.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.BatchTableEnvironment
+
+/**
+  * Descriptor for specifying a table source and/or sink in a batch environment.
+  */
+class BatchTableDescriptor(
+    tableEnv: BatchTableEnvironment,
+    connectorDescriptor: ConnectorDescriptor)
+  extends ConnectTableDescriptor[BatchTableDescriptor](
+    tableEnv,
+    connectorDescriptor)

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
deleted file mode 100644
index c967291..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.{BatchTableEnvironment, Table, ValidationException}
-import org.apache.flink.table.factories.{BatchTableSourceFactory, TableFactoryService}
-import org.apache.flink.table.sources.TableSource
-
-class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
-  extends TableSourceDescriptor {
-
-  connectorDescriptor = Some(connector)
-
-  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    // check for a format
-    if (connector.needsFormat() && formatDescriptor.isEmpty) {
-      throw new ValidationException(
-        s"The connector '$connector' requires a format description.")
-    } else if (!connector.needsFormat() && formatDescriptor.isDefined) {
-      throw new ValidationException(
-        s"The connector '$connector' does not require a format description " +
-          s"but '${formatDescriptor.get}' found.")
-    }
-    super.addProperties(properties)
-  }
-
-  /**
-    * Searches for the specified table source, configures it accordingly, and returns it.
-    */
-  def toTableSource: TableSource[_] = {
-    val properties = new DescriptorProperties()
-    addProperties(properties)
-    val javaMap = properties.asMap
-    TableFactoryService
-      .find(classOf[BatchTableSourceFactory[_]], javaMap)
-      .createBatchTableSource(javaMap)
-  }
-
-  /**
-    * Searches for the specified table source, configures it accordingly, and returns it as a table.
-    */
-  def toTable: Table = {
-    tableEnv.fromTableSource(toTableSource)
-  }
-
-  /**
-    * Searches for the specified table source, configures it accordingly, and registers it as
-    * a table under the given name.
-    *
-    * @param name table name to be registered in the table environment
-    */
-  def register(name: String): Unit = {
-    tableEnv.registerTableSource(name, toTableSource)
-  }
-
-  /**
-    * Specifies the format that defines how to read data from a connector.
-    */
-  def withFormat(format: FormatDescriptor): BatchTableSourceDescriptor = {
-    formatDescriptor = Some(format)
-    this
-  }
-
-  /**
-    * Specifies the resulting table schema.
-    */
-  def withSchema(schema: Schema): BatchTableSourceDescriptor = {
-    schemaDescriptor = Some(schema)
-    this
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
new file mode 100644
index 0000000..569b825
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.factories.TableFactoryUtil
+
+/**
+  * Common class for table's created with [[TableEnvironment.connect(ConnectorDescriptor)]].
+  */
+abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
+    private val tableEnv: TableEnvironment,
+    private val connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor
+  with SchematicDescriptor[D]
+  with RegistrableDescriptor { this: D =>
+
+  private var formatDescriptor: Option[FormatDescriptor] = None
+  private var schemaDescriptor: Option[Schema] = None
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  override def registerTableSource(name: String): Unit = {
+    val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)
+    tableEnv.registerTableSource(name, tableSource)
+  }
+
+  /**
+    * Searches for the specified table sink, configures it accordingly, and registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  override def registerTableSink(name: String): Unit = {
+    val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this)
+    tableEnv.registerTableSink(name, tableSink)
+  }
+
+  /**
+    * Searches for the specified table source and sink, configures them accordingly, and registers
+    * them as a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  override def registerTableSourceAndSink(name: String): Unit = {
+    registerTableSource(name)
+    registerTableSink(name)
+  }
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  override def withFormat(format: FormatDescriptor): D = {
+    formatDescriptor = Some(format)
+    this
+  }
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  override def withSchema(schema: Schema): D = {
+    schemaDescriptor = Some(schema)
+    this
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+
+    // this performs only basic validation
+    // more validation can only happen within a factory
+    if (connectorDescriptor.needsFormat() && formatDescriptor.isEmpty) {
+      throw new ValidationException(
+        s"The connector '$connectorDescriptor' requires a format description.")
+    } else if (!connectorDescriptor.needsFormat() && formatDescriptor.isDefined) {
+      throw new ValidationException(
+        s"The connector '$connectorDescriptor' does not require a format description " +
+          s"but '${formatDescriptor.get}' found.")
+    }
+
+    connectorDescriptor.addProperties(properties)
+    formatDescriptor.foreach(_.addProperties(properties))
+    schemaDescriptor.foreach(_.addProperties(properties))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
index e21527b..aa96bc4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Descriptor.scala
@@ -19,18 +19,23 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * A class that adds a set of string-based, normalized properties for describing DDL information.
+  * A trait that adds a set of string-based, normalized properties for describing DDL information.
   *
   * Typical characteristics of a descriptor are:
   * - descriptors have a default constructor and a default 'apply()' method for Scala
   * - descriptors themselves contain very little logic
   * - corresponding validators validate the correctness (goal: have a single point of validation)
   */
-abstract class Descriptor {
+trait Descriptor {
 
   /**
     * Internal method for properties conversion.
     */
   private[flink] def addProperties(properties: DescriptorProperties): Unit
 
+  override def toString: String = {
+    val descriptorProperties = new DescriptorProperties()
+    addProperties(descriptorProperties)
+    descriptorProperties.toString
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 3ad3eac..2c88dfd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -1064,6 +1064,10 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     properties.toMap.asJava
   }
 
+  override def toString: String = {
+    DescriptorProperties.toString(properties.toMap)
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   /**
@@ -1283,6 +1287,12 @@ object DescriptorProperties {
       .mkString("\n")
   }
 
+  def toJavaMap(descriptor: Descriptor): util.Map[String, String] = {
+    val descriptorProperties = new DescriptorProperties()
+    descriptor.addProperties(descriptorProperties)
+    descriptorProperties.asMap
+  }
+
   // the following methods help for Scala <-> Java interfaces
   // most of these methods are not necessary once we upgraded to Scala 2.12
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala
new file mode 100644
index 0000000..e89ca8c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RegistrableDescriptor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A trait for descriptors that allow to register table source and/or sinks.
+  */
+trait RegistrableDescriptor extends TableDescriptor {
+
+  /**
+    * Searches for the specified table source, configures it accordingly, and registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def registerTableSource(name: String): Unit
+
+  /**
+    * Searches for the specified table sink, configures it accordingly, and registers it as
+    * a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def registerTableSink(name: String): Unit
+
+  /**
+    * Searches for the specified table source and sink, configures them accordingly, and registers
+    * them as a table under the given name.
+    *
+    * @param name table name to be registered in the table environment
+    */
+  def registerTableSourceAndSink(name: String): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
new file mode 100644
index 0000000..794ff9e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchematicDescriptor.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A trait for descriptors that allow to define a format and schema.
+  */
+trait SchematicDescriptor[D <: SchematicDescriptor[D]] extends TableDescriptor {
+
+  /**
+    * Specifies the format that defines how to read data from a connector.
+    */
+  def withFormat(format: FormatDescriptor): D
+
+  /**
+    * Specifies the resulting table schema.
+    */
+  def withSchema(schema: Schema): D
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
new file mode 100644
index 0000000..b9e64f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+
+/**
+  * Descriptor for specifying a table source and/or sink in a streaming environment.
+  */
+class StreamTableDescriptor(
+    tableEnv: StreamTableEnvironment,
+    connectorDescriptor: ConnectorDescriptor)
+  extends ConnectTableDescriptor[StreamTableDescriptor](
+    tableEnv,
+    connectorDescriptor)
+  with StreamableDescriptor[StreamTableDescriptor] {
+
+  private var updateMode: Option[String] = None
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an external connector.
+    *
+    * In append mode, a dynamic table and an external connector only exchange INSERT messages.
+    *
+    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+    */
+  override def inAppendMode(): StreamTableDescriptor = {
+    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
+    this
+  }
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an external connector.
+    *
+    * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
+    *
+    * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+    * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+    * the updating (new) row.
+    *
+    * In this mode, a key must not be defined as opposed to upsert mode. However, every update
+    * consists of two messages which is less efficient.
+    *
+    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+    */
+  override def inRetractMode(): StreamTableDescriptor = {
+    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
+    this
+  }
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an external connector.
+    *
+    * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
+    *
+    * This mode requires a (possibly composite) unique key by which updates can be propagated. The
+    * external connector needs to be aware of the unique key attribute in order to apply messages
+    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+    * DELETE messages.
+    *
+    * The main difference to a retract stream is that UPDATE changes are encoded with a single
+    * message and are therefore more efficient.
+    *
+    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+    */
+  override def inUpsertMode(): StreamTableDescriptor = {
+    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
+    this
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Internal method for properties conversion.
+    */
+  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
+    super.addProperties(properties)
+    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
+
+    // this performs only basic validation
+    // more validation can only happen within a factory
+    new StreamTableDescriptorValidator().validate(properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
new file mode 100644
index 0000000..5a6a946
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.util
+
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
+
+/**
+  * Validator for [[StreamTableDescriptor]].
+  */
+class StreamTableDescriptorValidator extends DescriptorValidator {
+
+  override def validate(properties: DescriptorProperties): Unit = {
+    properties.validateEnumValues(
+      UPDATE_MODE,
+      isOptional = false,
+      util.Arrays.asList(
+        UPDATE_MODE_VALUE_APPEND,
+        UPDATE_MODE_VALUE_RETRACT,
+        UPDATE_MODE_VALUE_UPSERT)
+    )
+  }
+}
+
+object StreamTableDescriptorValidator {
+
+  val UPDATE_MODE = "update-mode"
+  val UPDATE_MODE_VALUE_APPEND = "append"
+  val UPDATE_MODE_VALUE_RETRACT = "retract"
+  val UPDATE_MODE_VALUE_UPSERT = "upsert"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
deleted file mode 100644
index 6ade2d6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableSourceDescriptor.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.{StreamTableEnvironment, Table, ValidationException}
-import org.apache.flink.table.factories.{StreamTableSourceFactory, TableFactoryService}
-import org.apache.flink.table.sources.TableSource
-
-/**
-  * Descriptor for specifying a table source in a streaming environment.
-  */
-class StreamTableSourceDescriptor(tableEnv: StreamTableEnvironment, connector: ConnectorDescriptor)
-  extends TableSourceDescriptor {
-
-  connectorDescriptor = Some(connector)
-
-  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    // check for a format
-    if (connector.needsFormat() && formatDescriptor.isEmpty) {
-      throw new ValidationException(
-        s"The connector '$connector' requires a format description.")
-    } else if (!connector.needsFormat() && formatDescriptor.isDefined) {
-      throw new ValidationException(
-        s"The connector '$connector' does not require a format description " +
-          s"but '${formatDescriptor.get}' found.")
-    }
-    super.addProperties(properties)
-  }
-
-  /**
-    * Searches for the specified table source, configures it accordingly, and returns it.
-    */
-  def toTableSource: TableSource[_] = {
-    val properties = new DescriptorProperties()
-    addProperties(properties)
-    val javaMap = properties.asMap
-    TableFactoryService
-      .find(classOf[StreamTableSourceFactory[_]], javaMap)
-      .createStreamTableSource(javaMap)
-  }
-
-  /**
-    * Searches for the specified table source, configures it accordingly, and returns it as a table.
-    */
-  def toTable: Table = {
-    tableEnv.fromTableSource(toTableSource)
-  }
-
-  /**
-    * Searches for the specified table source, configures it accordingly, and registers it as
-    * a table under the given name.
-    *
-    * @param name table name to be registered in the table environment
-    */
-  def register(name: String): Unit = {
-    tableEnv.registerTableSource(name, toTableSource)
-  }
-
-  /**
-    * Specifies the format that defines how to read data from a connector.
-    */
-  def withFormat(format: FormatDescriptor): StreamTableSourceDescriptor = {
-    formatDescriptor = Some(format)
-    this
-  }
-
-  /**
-    * Specifies the resulting table schema.
-    */
-  def withSchema(schema: Schema): StreamTableSourceDescriptor = {
-    schemaDescriptor = Some(schema)
-    this
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
new file mode 100644
index 0000000..0d424bd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+/**
+  * A trait for descriptors that allow to convert between a dynamic table and an external connector.
+  */
+trait StreamableDescriptor[D <: StreamableDescriptor[D]] extends TableDescriptor {
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an external connector.
+    *
+    * In append mode, a dynamic table and an external connector only exchange INSERT messages.
+    *
+    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
+    */
+  def inAppendMode(): D
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an external connector.
+    *
+    * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.
+    *
+    * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an
+    * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for
+    * the updating (new) row.
+    *
+    * In this mode, a key must not be defined as opposed to upsert mode. However, every update
+    * consists of two messages which is less efficient.
+    *
+    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
+    */
+  def inRetractMode(): D
+
+  /**
+    * Declares how to perform the conversion between a dynamic table and an external connector.
+    *
+    * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.
+    *
+    * This mode requires a (possibly composite) unique key by which updates can be propagated. The
+    * external connector needs to be aware of the unique key attribute in order to apply messages
+    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as
+    * DELETE messages.
+    *
+    * The main difference to a retract stream is that UPDATE changes are encoded with a single
+    * message and are therefore more efficient.
+    *
+    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
+    */
+  def inUpsertMode(): D
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
index 7b864d8..b14a310 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptor.scala
@@ -19,22 +19,6 @@
 package org.apache.flink.table.descriptors
 
 /**
-  * Common class for all descriptors describing table sources and sinks.
+  * Common trait for all descriptors describing table sources and sinks.
   */
-abstract class TableDescriptor extends Descriptor {
-
-  protected var connectorDescriptor: Option[ConnectorDescriptor] = None
-  protected var formatDescriptor: Option[FormatDescriptor] = None
-  protected var schemaDescriptor: Option[Schema] = None
-  protected var metaDescriptor: Option[Metadata] = None
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    connectorDescriptor.foreach(_.addProperties(properties))
-    formatDescriptor.foreach(_.addProperties(properties))
-    schemaDescriptor.foreach(_.addProperties(properties))
-    metaDescriptor.foreach(_.addProperties(properties))
-  }
-}
+trait TableDescriptor extends Descriptor

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
deleted file mode 100644
index e0fa602..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-/**
-  * Validator for [[TableDescriptor]].
-  */
-class TableDescriptorValidator extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    // nothing to do
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
deleted file mode 100644
index 0a4d504..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSinkDescriptor.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-/**
-  * Common class for all descriptors describing a table sink.
-  */
-abstract class TableSinkDescriptor extends TableDescriptor {
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    super.addProperties(properties)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
deleted file mode 100644
index 3ca39c2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.DescriptorProperties.toScala
-import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_ROW_COUNT, readColumnStats}
-import org.apache.flink.table.plan.stats.TableStats
-
-import scala.collection.JavaConverters._
-
-/**
-  * Common class for all descriptors describing a table source.
-  */
-abstract class TableSourceDescriptor extends TableDescriptor {
-
-  protected var statisticsDescriptor: Option[Statistics] = None
-
-  /**
-    * Internal method for properties conversion.
-    */
-  override private[flink] def addProperties(properties: DescriptorProperties): Unit = {
-    super.addProperties(properties)
-    statisticsDescriptor.foreach(_.addProperties(properties))
-  }
-
-  /**
-    * Reads table statistics from the descriptors properties.
-    */
-  protected def getTableStats: Option[TableStats] = {
-    val normalizedProps = new DescriptorProperties()
-    addProperties(normalizedProps)
-    val rowCount = toScala(normalizedProps.getOptionalLong(STATISTICS_ROW_COUNT))
-    rowCount match {
-      case Some(cnt) =>
-        val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
-        Some(TableStats(cnt, columnStats.asJava))
-      case None =>
-        None
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
index cc99ecc..a5dcc60 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactory.scala
@@ -29,6 +29,12 @@ import java.util
   * Classes that implement this interface can be added to the
   * "META_INF/services/org.apache.flink.table.factories.TableFactory" file of a JAR file in
   * the current classpath to be found.
+  *
+  * @see [[BatchTableSourceFactory]]
+  * @see [[BatchTableSinkFactory]]
+  * @see [[StreamTableSourceFactory]]
+  * @see [[StreamTableSinkFactory]]
+  * @see [[TableFormatFactory]]
   */
 trait TableFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
index 3baff8e..26b7c6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala
@@ -50,9 +50,7 @@ object TableFactoryService extends Logging {
   def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
     Preconditions.checkNotNull(descriptor)
 
-    val descriptorProperties = new DescriptorProperties()
-    descriptor.addProperties(descriptorProperties)
-    findInternal(factoryClass, descriptorProperties.asMap, None)
+    findInternal(factoryClass, DescriptorProperties.toJavaMap(descriptor), None)
   }
 
   /**
@@ -68,9 +66,7 @@ object TableFactoryService extends Logging {
     Preconditions.checkNotNull(descriptor)
     Preconditions.checkNotNull(classLoader)
 
-    val descriptorProperties = new DescriptorProperties()
-    descriptor.addProperties(descriptorProperties)
-    findInternal(factoryClass, descriptorProperties.asMap, Some(classLoader))
+    findInternal(factoryClass, DescriptorProperties.toJavaMap(descriptor), Some(classLoader))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
new file mode 100644
index 0000000..9989ebc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories
+
+import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment, TableException}
+import org.apache.flink.table.descriptors.{Descriptor, DescriptorProperties}
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+/**
+  * Utility for dealing with [[TableFactory]] using the [[TableFactoryService]].
+  */
+object TableFactoryUtil {
+
+  /**
+    * Returns a table source for a table environment.
+    */
+  def findAndCreateTableSource[T](
+      tableEnvironment: TableEnvironment,
+      descriptor: Descriptor)
+    : TableSource[T] = {
+
+    val javaMap = DescriptorProperties.toJavaMap(descriptor)
+
+    tableEnvironment match {
+      case _: BatchTableEnvironment =>
+        TableFactoryService
+          .find(classOf[BatchTableSourceFactory[T]], javaMap)
+          .createBatchTableSource(javaMap)
+
+      case _: StreamTableEnvironment =>
+        TableFactoryService
+          .find(classOf[StreamTableSourceFactory[T]], javaMap)
+          .createStreamTableSource(javaMap)
+
+      case e@_ =>
+        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
+    }
+  }
+
+  /**
+    * Returns a table sink for a table environment.
+    */
+  def findAndCreateTableSink[T](
+      tableEnvironment: TableEnvironment,
+      descriptor: Descriptor)
+    : TableSink[T] = {
+
+    val javaMap = DescriptorProperties.toJavaMap(descriptor)
+
+    tableEnvironment match {
+      case _: BatchTableEnvironment =>
+        TableFactoryService
+          .find(classOf[BatchTableSinkFactory[T]], javaMap)
+          .createBatchTableSink(javaMap)
+
+      case _: StreamTableEnvironment =>
+        TableFactoryService
+          .find(classOf[StreamTableSinkFactory[T]], javaMap)
+          .createStreamTableSink(javaMap)
+
+      case e@_ =>
+        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
index 8fa6fad..9e42a15 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFormatFactory.scala
@@ -25,6 +25,8 @@ import java.util
   * also [[TableFactory]] for more information.
   *
   * @tparam T record type that the format produces or consumes
+  * @see [[DeserializationSchemaFactory]]
+  * @see [[SerializationSchemaFactory]]
   */
 trait TableFormatFactory[T] extends TableFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
new file mode 100644
index 0000000..65a41bb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvAppendTableSinkFactory.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+  * Factory base for creating configured instances of [[CsvTableSink]] in a stream environment.
+  */
+class CsvAppendTableSinkFactory
+  extends CsvTableSinkFactoryBase
+  with StreamTableSinkFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String](super.requiredContext())
+    context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
+    context
+  }
+
+  override def createStreamTableSink(
+      properties: util.Map[String, String])
+    : StreamTableSink[Row] = {
+    createTableSink(isStreaming = true, properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
new file mode 100644
index 0000000..2687ed2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.factories.BatchTableSinkFactory
+import org.apache.flink.types.Row
+
+/**
+  * Factory base for creating configured instances of [[CsvTableSink]] in a batch environment.
+  */
+class CsvBatchTableSinkFactory
+  extends CsvTableSinkFactoryBase
+  with BatchTableSinkFactory[Row] {
+
+  override def createBatchTableSink(
+      properties: util.Map[String, String])
+    : BatchTableSink[Row] = {
+    createTableSink(isStreaming = false, properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
deleted file mode 100644
index eb99f02..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactory.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
-import org.apache.flink.table.descriptors.CsvValidator._
-import org.apache.flink.table.descriptors.DescriptorProperties._
-import org.apache.flink.table.descriptors.FileSystemValidator._
-import org.apache.flink.table.descriptors.FormatDescriptorValidator._
-import org.apache.flink.table.descriptors.SchemaValidator._
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.{BatchTableSinkFactory, StreamTableSinkFactory, TableFactory}
-import org.apache.flink.types.Row
-
-/**
-  * Factory for creating configured instances of [[CsvTableSink]].
-  */
-class CsvTableSinkFactory
-  extends TableFactory
-  with StreamTableSinkFactory[Row]
-  with BatchTableSinkFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
-    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    // connector
-    properties.add(CONNECTOR_PATH)
-    // format
-    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
-    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
-    properties.add(FORMAT_FIELD_DELIMITER)
-    properties.add(CONNECTOR_PATH)
-    // schema
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
-    properties
-  }
-
-  override def createStreamTableSink(
-      properties: util.Map[String, String])
-    : StreamTableSink[Row] = {
-    createTableSink(isStreaming = true, properties)
-  }
-
-  override def createBatchTableSink(
-      properties: util.Map[String, String])
-    : BatchTableSink[Row] = {
-    createTableSink(isStreaming = false, properties)
-  }
-
-  private def createTableSink(
-      isStreaming: Boolean,
-      properties: util.Map[String, String])
-    : CsvTableSink = {
-
-    val params = new DescriptorProperties()
-    params.putProperties(properties)
-
-    // validate
-    new FileSystemValidator().validate(params)
-    new CsvValidator().validate(params)
-    new SchemaValidator(
-      isStreaming,
-      supportsSourceTimestamps = false,
-      supportsSourceWatermarks = false).validate(params)
-
-    // build
-    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
-    val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
-
-    if (!formatSchema.equals(tableSchema)) {
-      throw new TableException(
-        "Encodings that differ from the schema are not supported yet for CsvTableSink.")
-    }
-
-    val path = params.getString(CONNECTOR_PATH)
-    val fieldDelimiter = toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",")
-
-    val csvTableSink = new CsvTableSink(path, fieldDelimiter)
-
-    csvTableSink
-      .configure(formatSchema.getColumnNames, formatSchema.getTypes)
-      .asInstanceOf[CsvTableSink]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
new file mode 100644
index 0000000..6ceba4c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.DescriptorProperties._
+import org.apache.flink.table.descriptors.FileSystemValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.SchemaValidator._
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.factories.TableFactory
+
+/**
+  * Factory base for creating configured instances of [[CsvTableSink]].
+  */
+abstract class CsvTableSinkFactoryBase extends TableFactory {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add(CONNECTOR_PATH)
+    // format
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+    properties.add(FORMAT_FIELD_DELIMITER)
+    properties.add(CONNECTOR_PATH)
+    // schema
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+    properties
+  }
+
+  protected def createTableSink(
+      isStreaming: Boolean,
+      properties: util.Map[String, String])
+    : CsvTableSink = {
+
+    val params = new DescriptorProperties()
+    params.putProperties(properties)
+
+    // validate
+    new FileSystemValidator().validate(params)
+    new CsvValidator().validate(params)
+    new SchemaValidator(
+      isStreaming,
+      supportsSourceTimestamps = false,
+      supportsSourceWatermarks = false).validate(params)
+
+    // build
+    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
+    val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
+
+    if (!formatSchema.equals(tableSchema)) {
+      throw new TableException(
+        "Encodings that differ from the schema are not supported yet for CsvTableSink.")
+    }
+
+    val path = params.getString(CONNECTOR_PATH)
+    val fieldDelimiter = toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER)).getOrElse(",")
+
+    val csvTableSink = new CsvTableSink(path, fieldDelimiter)
+
+    csvTableSink
+      .configure(formatSchema.getColumnNames, formatSchema.getTypes)
+      .asInstanceOf[CsvTableSink]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
new file mode 100644
index 0000000..afbe2ea
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvAppendTableSourceFactory.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, UPDATE_MODE_VALUE_APPEND}
+import org.apache.flink.table.factories.StreamTableSourceFactory
+import org.apache.flink.types.Row
+
+/**
+  * Factory for creating configured instances of [[CsvTableSource]] in a stream environment.
+  */
+class CsvAppendTableSourceFactory
+  extends CsvTableSourceFactoryBase
+  with StreamTableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String](super.requiredContext())
+    context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND)
+    context
+  }
+
+  override def createStreamTableSource(
+      properties: util.Map[String, String])
+    : StreamTableSource[Row] = {
+    createTableSource(isStreaming = true, properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
new file mode 100644
index 0000000..9d8fa40
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvBatchTableSourceFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.factories.BatchTableSourceFactory
+import org.apache.flink.types.Row
+
+/**
+  * Factory for creating configured instances of [[CsvTableSource]] in a batch environment.
+  */
+class CsvBatchTableSourceFactory
+  extends CsvTableSourceFactoryBase
+  with BatchTableSourceFactory[Row] {
+
+  override def createBatchTableSource(
+      properties: util.Map[String, String])
+    : BatchTableSource[Row] = {
+    createTableSource(isStreaming = false, properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
deleted file mode 100644
index 96751ec..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactory.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import java.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.CsvValidator._
-import org.apache.flink.table.descriptors.DescriptorProperties.toScala
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
-import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
-import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA
-import org.apache.flink.table.descriptors._
-import org.apache.flink.table.factories.{BatchTableSourceFactory, StreamTableSourceFactory, TableFactory}
-import org.apache.flink.types.Row
-
-/**
-  * Factory for creating configured instances of [[CsvTableSource]].
-  */
-class CsvTableSourceFactory
-  extends TableFactory
-  with StreamTableSourceFactory[Row]
-  with BatchTableSourceFactory[Row] {
-
-  override def requiredContext(): util.Map[String, String] = {
-    val context = new util.HashMap[String, String]()
-    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
-    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
-    context.put(CONNECTOR_PROPERTY_VERSION, "1")
-    context.put(FORMAT_PROPERTY_VERSION, "1")
-    context
-  }
-
-  override def supportedProperties(): util.List[String] = {
-    val properties = new util.ArrayList[String]()
-    // connector
-    properties.add(CONNECTOR_PATH)
-    // format
-    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
-    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
-    properties.add(FORMAT_FIELD_DELIMITER)
-    properties.add(FORMAT_LINE_DELIMITER)
-    properties.add(FORMAT_QUOTE_CHARACTER)
-    properties.add(FORMAT_COMMENT_PREFIX)
-    properties.add(FORMAT_IGNORE_FIRST_LINE)
-    properties.add(FORMAT_IGNORE_PARSE_ERRORS)
-    properties.add(CONNECTOR_PATH)
-    // schema
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
-    properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
-    properties
-  }
-
-  override def createStreamTableSource(
-      properties: util.Map[String, String])
-    : StreamTableSource[Row] = {
-    createTableSource(isStreaming = true, properties)
-  }
-
-  override def createBatchTableSource(
-      properties: util.Map[String, String])
-    : BatchTableSource[Row] = {
-    createTableSource(isStreaming = false, properties)
-  }
-
-  private def createTableSource(
-      isStreaming: Boolean,
-      properties: util.Map[String, String])
-    : CsvTableSource = {
-
-    val params = new DescriptorProperties()
-    params.putProperties(properties)
-
-    // validate
-    new FileSystemValidator().validate(params)
-    new CsvValidator().validate(params)
-    new SchemaValidator(
-      isStreaming,
-      supportsSourceTimestamps = false,
-      supportsSourceWatermarks = false).validate(params)
-
-    // build
-    val csvTableSourceBuilder = new CsvTableSource.Builder
-
-    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
-    val tableSchema = params.getTableSchema(SCHEMA)
-
-    // the CsvTableSource needs some rework first
-    // for now the schema must be equal to the encoding
-    if (!formatSchema.equals(tableSchema)) {
-      throw new TableException(
-        "Encodings that differ from the schema are not supported yet for CsvTableSources.")
-    }
-
-    toScala(params.getOptionalString(CONNECTOR_PATH))
-      .foreach(csvTableSourceBuilder.path)
-    toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
-      .foreach(csvTableSourceBuilder.fieldDelimiter)
-    toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
-      .foreach(csvTableSourceBuilder.lineDelimiter)
-
-    formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) =>
-      csvTableSourceBuilder.field(name, tpe)
-    }
-    toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
-      .foreach(csvTableSourceBuilder.quoteCharacter)
-    toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
-      .foreach(csvTableSourceBuilder.commentPrefix)
-    toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag =>
-      if (flag) {
-        csvTableSourceBuilder.ignoreFirstLine()
-      }
-    }
-    toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag =>
-      if (flag) {
-        csvTableSourceBuilder.ignoreParseErrors()
-      }
-    }
-
-    csvTableSourceBuilder.build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
new file mode 100644
index 0000000..d320220
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_PROPERTY_VERSION, CONNECTOR_TYPE}
+import org.apache.flink.table.descriptors.CsvValidator._
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
+import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+import org.apache.flink.table.descriptors.FormatDescriptorValidator.{FORMAT_PROPERTY_VERSION, FORMAT_TYPE}
+import org.apache.flink.table.descriptors.SchemaValidator.SCHEMA
+import org.apache.flink.table.descriptors._
+import org.apache.flink.table.factories.TableFactory
+
+/**
+  * Factory base for creating configured instances of [[CsvTableSource]].
+  */
+abstract class CsvTableSourceFactoryBase extends TableFactory {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE)
+    context.put(FORMAT_TYPE, FORMAT_TYPE_VALUE)
+    context.put(CONNECTOR_PROPERTY_VERSION, "1")
+    context.put(FORMAT_PROPERTY_VERSION, "1")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val properties = new util.ArrayList[String]()
+    // connector
+    properties.add(CONNECTOR_PATH)
+    // format
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$FORMAT_FIELDS.#.${DescriptorProperties.NAME}")
+    properties.add(FORMAT_FIELD_DELIMITER)
+    properties.add(FORMAT_LINE_DELIMITER)
+    properties.add(FORMAT_QUOTE_CHARACTER)
+    properties.add(FORMAT_COMMENT_PREFIX)
+    properties.add(FORMAT_IGNORE_FIRST_LINE)
+    properties.add(FORMAT_IGNORE_PARSE_ERRORS)
+    properties.add(CONNECTOR_PATH)
+    // schema
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.TYPE}")
+    properties.add(s"$SCHEMA.#.${DescriptorProperties.NAME}")
+    properties
+  }
+
+  protected def createTableSource(
+      isStreaming: Boolean,
+      properties: util.Map[String, String])
+    : CsvTableSource = {
+
+    val params = new DescriptorProperties()
+    params.putProperties(properties)
+
+    // validate
+    new FileSystemValidator().validate(params)
+    new CsvValidator().validate(params)
+    new SchemaValidator(
+      isStreaming,
+      supportsSourceTimestamps = false,
+      supportsSourceWatermarks = false).validate(params)
+
+    // build
+    val csvTableSourceBuilder = new CsvTableSource.Builder
+
+    val formatSchema = params.getTableSchema(FORMAT_FIELDS)
+    val tableSchema = params.getTableSchema(SCHEMA)
+
+    // the CsvTableSource needs some rework first
+    // for now the schema must be equal to the encoding
+    if (!formatSchema.equals(tableSchema)) {
+      throw new TableException(
+        "Encodings that differ from the schema are not supported yet for CsvTableSources.")
+    }
+
+    toScala(params.getOptionalString(CONNECTOR_PATH))
+      .foreach(csvTableSourceBuilder.path)
+    toScala(params.getOptionalString(FORMAT_FIELD_DELIMITER))
+      .foreach(csvTableSourceBuilder.fieldDelimiter)
+    toScala(params.getOptionalString(FORMAT_LINE_DELIMITER))
+      .foreach(csvTableSourceBuilder.lineDelimiter)
+
+    formatSchema.getColumnNames.zip(formatSchema.getTypes).foreach { case (name, tpe) =>
+      csvTableSourceBuilder.field(name, tpe)
+    }
+    toScala(params.getOptionalCharacter(FORMAT_QUOTE_CHARACTER))
+      .foreach(csvTableSourceBuilder.quoteCharacter)
+    toScala(params.getOptionalString(FORMAT_COMMENT_PREFIX))
+      .foreach(csvTableSourceBuilder.commentPrefix)
+    toScala(params.getOptionalBoolean(FORMAT_IGNORE_FIRST_LINE)).foreach { flag =>
+      if (flag) {
+        csvTableSourceBuilder.ignoreFirstLine()
+      }
+    }
+    toScala(params.getOptionalBoolean(FORMAT_IGNORE_PARSE_ERRORS)).foreach { flag =>
+      if (flag) {
+        csvTableSourceBuilder.ignoreParseErrors()
+      }
+    }
+
+    csvTableSourceBuilder.build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
index 6df00e7..7f567f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -39,7 +39,9 @@ class ExternalCatalogTest extends TableTestBase {
     val util = batchTestUtil()
     val tableEnv = util.tableEnv
 
-    tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
 
     val table1 = tableEnv.scan("test", "db1", "tb1")
     val table2 = tableEnv.scan("test", "db2", "tb2")
@@ -69,7 +71,9 @@ class ExternalCatalogTest extends TableTestBase {
   def testBatchSQL(): Unit = {
     val util = batchTestUtil()
 
-    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+    util.tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
 
     val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
         "(SELECT a * 2, b, c FROM test.db1.tb1)"
@@ -96,7 +100,9 @@ class ExternalCatalogTest extends TableTestBase {
     val util = streamTestUtil()
     val tableEnv = util.tableEnv
 
-    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+    util.tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
 
     val table1 = tableEnv.scan("test", "db1", "tb1")
     val table2 = tableEnv.scan("test", "db2", "tb2")
@@ -128,7 +134,9 @@ class ExternalCatalogTest extends TableTestBase {
   def testStreamSQL(): Unit = {
     val util = streamTestUtil()
 
-    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+    util.tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
 
     val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
         "(SELECT a * 2, b, c FROM test.db1.tb1)"
@@ -155,7 +163,9 @@ class ExternalCatalogTest extends TableTestBase {
     val util = batchTestUtil()
     val tableEnv = util.tableEnv
 
-    tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
 
     val table1 = tableEnv.scan("test", "tb1")
     val table2 = tableEnv.scan("test", "db2", "tb2")

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 58f51f9..c98a7c1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -47,7 +47,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
   @Before
   def setUp(): Unit = {
     val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
-    val catalog = CommonTestData.getInMemoryTestCatalog
+    val catalog = CommonTestData.getInMemoryTestCatalog(isStreaming = true)
     ExternalCatalogSchema.registerCatalog(
       streamTestUtil().tableEnv, rootSchemaPlus, schemaName, catalog)
     externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")

http://git-wip-us.apache.org/repos/asf/flink/blob/6fcc1e9a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index ac2a729..1f84b3d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -138,7 +138,9 @@ class InMemoryExternalCatalogTest {
     val schemaDesc = new Schema()
       .field("first", BasicTypeInfo.STRING_TYPE_INFO)
       .field("second", BasicTypeInfo.INT_TYPE_INFO)
-    new ExternalCatalogTable(connDesc, None, Some(schemaDesc), None, None)
+    ExternalCatalogTable.builder(connDesc)
+      .withSchema(schemaDesc)
+      .asTableSource()
   }
 
   private def createTableInstance(
@@ -149,7 +151,9 @@ class InMemoryExternalCatalogTest {
     fieldNames.zipWithIndex.foreach { case (fieldName, index) =>
       schemaDesc.field(fieldName, fieldTypes(index))
     }
-    new ExternalCatalogTable(connDesc, None, Some(schemaDesc), None, None)
+    ExternalCatalogTable.builder(connDesc)
+      .withSchema(schemaDesc)
+      .asTableSource()
   }
 
   class TestConnectorDesc extends ConnectorDescriptor("test", version = 1, formatNeeded = false) {