You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:58 UTC

[13/13] flink git commit: [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks

[FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks

This closes #6201.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9597248a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9597248a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9597248a

Branch: refs/heads/master
Commit: 9597248a41b34e126aac6a807651b1d376dc6de1
Parents: ee40335
Author: Shuyi Chen <sh...@uber.com>
Authored: Tue Jun 19 12:00:34 2018 -0700
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 09:51:28 2018 +0200

----------------------------------------------------------------------
 ...nk.table.connectors.DiscoverableTableFactory |  16 ++
 ...pache.flink.table.sources.TableSourceFactory |  16 --
 ...nk.table.connectors.DiscoverableTableFactory |  16 ++
 ...pache.flink.table.sources.TableSourceFactory |  16 --
 ...nk.table.connectors.DiscoverableTableFactory |  16 ++
 ...pache.flink.table.sources.TableSourceFactory |  16 --
 ...nk.table.connectors.DiscoverableTableFactory |  16 ++
 ...pache.flink.table.sources.TableSourceFactory |  16 --
 .../kafka/KafkaTableSourceFactory.java          |   7 +-
 .../KafkaJsonTableSourceFactoryTestBase.java    |  10 +-
 flink-libraries/flink-sql-client/pom.xml        |   2 +-
 .../flink/table/client/config/Environment.java  |  54 ++++--
 .../apache/flink/table/client/config/Sink.java  |  53 ++++++
 .../flink/table/client/config/Source.java       |  25 +--
 .../flink/table/client/config/SourceSink.java   |  65 +++++++
 .../flink/table/client/gateway/Executor.java    |   2 +-
 .../client/gateway/local/ExecutionContext.java  |  34 +++-
 .../test/assembly/test-table-source-factory.xml |   2 +-
 .../gateway/utils/TestTableSourceFactory.java   |  12 +-
 .../test/resources/test-sql-client-factory.yaml |   3 +-
 ...nk.table.connectors.DiscoverableTableFactory |  17 ++
 ...pache.flink.table.sources.TableSourceFactory |  16 --
 .../flink/table/api/BatchTableEnvironment.scala | 107 ++++++++++-
 .../table/api/StreamTableEnvironment.scala      | 103 ++++++++++-
 .../flink/table/api/TableEnvironment.scala      |  21 ++-
 .../org/apache/flink/table/api/exceptions.scala |  40 ++--
 .../table/catalog/ExternalTableSourceUtil.scala |  25 ++-
 .../connectors/DiscoverableTableFactory.scala   |  68 +++++++
 .../table/connectors/TableFactoryService.scala  | 160 ++++++++++++++++
 .../table/connectors/TableSinkFactory.scala     |  34 ++++
 .../table/connectors/TableSourceFactory.scala   |  34 ++++
 .../BatchTableSourceDescriptor.scala            |   9 +-
 .../flink/table/descriptors/CsvValidator.scala  |   1 -
 .../descriptors/DescriptorProperties.scala      |   3 +-
 .../table/descriptors/SchemaValidator.scala     |  57 +++++-
 .../StreamTableSourceDescriptor.scala           |   9 +-
 .../descriptors/TableDescriptorValidator.scala  |   5 +-
 .../table/descriptors/TableSinkDescriptor.scala |  30 +++
 .../descriptors/TableSourceDescriptor.scala     |   2 +
 .../plan/nodes/PhysicalTableSourceScan.scala    |  10 +-
 .../logical/FlinkLogicalTableFunctionScan.scala |   1 -
 .../logical/FlinkLogicalTableSourceScan.scala   |  21 ++-
 .../dataSet/BatchTableSourceScanRule.scala      |  17 +-
 .../datastream/StreamTableSourceScanRule.scala  |  17 +-
 .../PushFilterIntoTableSourceScanRule.scala     |   7 +-
 .../plan/schema/StreamTableSourceTable.scala    |   2 +-
 .../table/plan/schema/TableSinkTable.scala      |  14 +-
 .../plan/schema/TableSourceSinkTable.scala      |  67 +++++++
 .../table/plan/schema/TableSourceTable.scala    |  17 +-
 .../flink/table/sinks/CsvTableSinkFactory.scala |  90 +++++++++
 .../table/sources/CsvTableSourceFactory.scala   |   5 +-
 .../table/sources/TableSourceFactory.scala      |  86 ---------
 .../sources/TableSourceFactoryService.scala     | 181 -------------------
 ...nk.table.connectors.DiscoverableTableFactory |  19 ++
 ...pache.flink.table.sources.TableSourceFactory |  17 --
 .../validation/InsertIntoValidationTest.scala   |   8 +-
 .../validation/InsertIntoValidationTest.scala   |   6 +-
 .../validation/InsertIntoValidationTest.scala   |   8 +-
 .../validation/InsertIntoValidationTest.scala   |   6 +-
 .../validation/TableSinksValidationTest.scala   |   2 +-
 .../catalog/ExternalCatalogSchemaTest.scala     |   8 +-
 .../TableSinkFactoryServiceTest.scala           |  85 +++++++++
 .../TableSourceFactoryServiceTest.scala         | 110 +++++++++++
 .../TestFixedFormatTableFactory.scala           |  63 +++++++
 .../table/connectors/TestTableSinkFactory.scala |  75 ++++++++
 .../connectors/TestTableSourceFactory.scala     |  64 +++++++
 .../TestWildcardFormatTableSourceFactory.scala  |  53 ++++++
 .../table/descriptors/SchemaValidatorTest.scala |  36 +++-
 .../descriptors/TableSourceDescriptorTest.scala |   3 +
 .../batch/sql/TableEnvironmentITCase.scala      |   8 +-
 .../batch/table/TableEnvironmentITCase.scala    |   8 +-
 .../runtime/stream/TimeAttributesITCase.scala   |   8 +-
 .../table/runtime/stream/sql/SqlITCase.scala    |  48 ++++-
 .../runtime/stream/table/TableSinkITCase.scala  |   8 +-
 .../sources/TableSourceFactoryServiceTest.scala | 100 ----------
 .../sources/TestFixedFormatTableFactory.scala   |  62 -------
 .../TestWildcardFormatTableSourceFactory.scala  |  56 ------
 .../table/utils/InMemoryTableFactory.scala      | 109 +++++++++++
 .../flink/table/utils/MemoryTableSinkUtil.scala |  89 ---------
 .../table/utils/MemoryTableSourceSinkUtil.scala | 151 ++++++++++++++++
 .../table/utils/MockTableEnvironment.scala      |   2 +
 81 files changed, 1994 insertions(+), 891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..21f5707
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index 21f5707..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..c056097
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index c056097..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..b83bb3f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index b83bb3f..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..fb14ddb
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index fb14ddb..0000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
index d88fc6f..563cd40 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connectors.DiscoverableTableFactory;
+import org.apache.flink.table.connectors.TableSourceFactory;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.KafkaValidator;
 import org.apache.flink.table.descriptors.SchemaValidator;
@@ -30,7 +32,6 @@ import org.apache.flink.table.formats.DeserializationSchemaFactory;
 import org.apache.flink.table.formats.TableFormatFactoryService;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactory;
 import org.apache.flink.types.Row;
 
 import java.util.ArrayList;
@@ -71,7 +72,7 @@ import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
 /**
  * Factory for creating configured instances of {@link KafkaTableSource}.
  */
-public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row> {
+public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>, DiscoverableTableFactory {
 
 	@Override
 	public Map<String, String> requiredContext() {
@@ -118,7 +119,7 @@ public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>
 	}
 
 	@Override
-	public TableSource<Row> create(Map<String, String> properties) {
+	public TableSource<Row> createTableSource(Map<String, String> properties) {
 		final DescriptorProperties params = new DescriptorProperties(true);
 		params.putProperties(properties);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 84c5bd3..c8b422f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.formats.json.JsonRowSchemaConverter;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connectors.TableFactoryService;
+import org.apache.flink.table.connectors.TableSourceFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.FormatDescriptor;
 import org.apache.flink.table.descriptors.Json;
 import org.apache.flink.table.descriptors.Kafka;
@@ -29,7 +32,6 @@ import org.apache.flink.table.descriptors.Rowtime;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactoryService;
 import org.apache.flink.table.sources.tsextractors.ExistingField;
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
 
@@ -148,7 +150,11 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 							new Rowtime().timestampsFromField("time").watermarksPeriodicAscending())
 						.field("proc-time", Types.SQL_TIMESTAMP).proctime());
 
-		final TableSource<?> factorySource = TableSourceFactoryService.findAndCreateTableSource(testDesc);
+		DescriptorProperties properties = new DescriptorProperties(true);
+		testDesc.addProperties(properties);
+		final TableSource<?> factorySource =
+				((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc))
+						.createTableSource(properties.asMap());
 
 		assertEquals(builderSource, factorySource);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index 6bcfc13..b413b6f 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -159,7 +159,7 @@ under the License.
 										<include>org.codehaus.commons.compiler.properties</include>
 										<include>org/codehaus/janino/**</include>
 										<include>org/codehaus/commons/**</include>
-										<include>META-INF/services/org.apache.flink.table.sources.TableSourceFactory</include>
+										<include>META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory</include>
 										<!-- flink-sql-client -->
 										<include>org/jline/**</include>
 										<include>com/fasterxml/jackson/**</include>

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index b26c45f..966a581 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -48,6 +48,8 @@ public class Environment {
 
 	private Deployment deployment;
 
+	private static final String NAME = "name";
+
 	public Environment() {
 		this.tables = Collections.emptyMap();
 		this.functions = Collections.emptyMap();
@@ -62,20 +64,21 @@ public class Environment {
 	public void setTables(List<Map<String, Object>> tables) {
 		this.tables = new HashMap<>(tables.size());
 		tables.forEach(config -> {
-			if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
-				throw new SqlClientException("The 'type' attribute of a table is missing.");
+			if (!config.containsKey(NAME)) {
+				throw new SqlClientException("The 'name' attribute of a table is missing.");
+			}
+			final Object nameObject = config.get(NAME);
+			if (nameObject == null || !(nameObject instanceof String) || ((String) nameObject).length() <= 0) {
+				throw new SqlClientException("Invalid table name '" + nameObject + "'.");
 			}
-			if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
-				config.remove(TableDescriptorValidator.TABLE_TYPE());
-				final Source s = Source.create(config);
-				if (this.tables.containsKey(s.getName())) {
-					throw new SqlClientException("Duplicate source name '" + s.getName() + "'.");
-				}
-				this.tables.put(s.getName(), s);
-			} else {
-				throw new SqlClientException(
-						"Invalid table 'type' attribute value, only 'source' is supported");
+			final String name = (String) nameObject;
+			final Map<String, Object> properties = new HashMap<>(config);
+			properties.remove(NAME);
+
+			if (this.tables.containsKey(name)) {
+				throw new SqlClientException("Duplicate table name '" + name + "'.");
 			}
+			this.tables.put(name, createTableDescriptor(name, properties));
 		});
 	}
 
@@ -195,4 +198,31 @@ public class Environment {
 
 		return enrichedEnv;
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a table descriptor from a YAML config map.
+	 *
+	 * @param name name of the table
+	 * @param config YAML config map
+	 * @return table descriptor describing a source, sink, or both
+	 */
+	private static TableDescriptor createTableDescriptor(String name, Map<String, Object> config) {
+		final Object typeObject = config.get(TableDescriptorValidator.TABLE_TYPE());
+		if (typeObject == null || !(typeObject instanceof String)) {
+			throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
+		}
+		final String type = (String) config.get(TableDescriptorValidator.TABLE_TYPE());
+		final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
+		if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+			return new Source(name, normalizedConfig);
+		} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+			return new Sink(name, normalizedConfig);
+		} else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+			return new SourceSink(name, normalizedConfig);
+		}
+		throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'. " +
+			"Only 'source', 'sink', and 'both' are supported.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
new file mode 100644
index 0000000..0de65fb
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.TableSinkDescriptor;
+
+import java.util.Map;
+
+/**
+ * Configuration of a table sink.
+ */
+public class Sink extends TableSinkDescriptor {
+
+	private String name;
+	private Map<String, String> properties;
+
+	protected Sink(String name, Map<String, String> properties) {
+		this.name = name;
+		this.properties = properties;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public Map<String, String> getProperties() {
+		return properties;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void addProperties(DescriptorProperties properties) {
+		this.properties.forEach(properties::putString);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
index 2bef257..ef80596 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
@@ -18,25 +18,20 @@
 
 package org.apache.flink.table.client.config;
 
-import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.TableSourceDescriptor;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Configuration of a table source. Parses an entry in the `tables` list of an environment
- * file and translates to table descriptor properties.
+ * Configuration of a table source.
  */
 public class Source extends TableSourceDescriptor {
 
 	private String name;
 	private Map<String, String> properties;
 
-	private static final String NAME = "name";
-
-	private Source(String name, Map<String, String> properties) {
+	protected Source(String name, Map<String, String> properties) {
 		this.name = name;
 		this.properties = properties;
 	}
@@ -49,22 +44,6 @@ public class Source extends TableSourceDescriptor {
 		return properties;
 	}
 
-	/**
-	 * Creates a table source descriptor with the given config.
-	 */
-	public static Source create(Map<String, Object> config) {
-		if (!config.containsKey(NAME)) {
-			throw new SqlClientException("The 'name' attribute of a table source is missing.");
-		}
-		final Object name = config.get(NAME);
-		if (name == null || !(name instanceof String) || ((String) name).length() <= 0) {
-			throw new SqlClientException("Invalid table source name '" + name + "'.");
-		}
-		final Map<String, Object> properties = new HashMap<>(config);
-		properties.remove(NAME);
-		return new Source((String) name, ConfigUtil.normalizeYaml(properties));
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
new file mode 100644
index 0000000..33fb0f1
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.TableDescriptor;
+
+import java.util.Map;
+
+/**
+ * Common class for all descriptors describing a table source and sink together.
+ */
+public class SourceSink extends TableDescriptor {
+
+	private String name;
+	private Map<String, String> properties;
+
+	protected SourceSink(String name, Map<String, String> properties) {
+		this.name = name;
+		this.properties = properties;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public Map<String, String> getProperties() {
+		return properties;
+	}
+
+	@Override
+	public void addProperties(DescriptorProperties properties) {
+		this.properties.forEach(properties::putString);
+	}
+
+	public Source toSource() {
+		final Map<String, String> newProperties = new HashMap<>(properties);
+		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
+				TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
+		return new Source(name, newProperties);
+	}
+
+	public Sink toSink() {
+		final Map<String, String> newProperties = new HashMap<>(properties);
+		newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
+				TableDescriptorValidator.TABLE_TYPE_VALUE_SINK());
+		return new Sink(name, newProperties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index f12ff4d..9ace240 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -62,7 +62,7 @@ public interface Executor {
 	String explainStatement(SessionContext session, String statement) throws SqlExecutionException;
 
 	/**
-	 * Submits a Flink job (detached) and returns the result descriptor.
+	 * Submits a Flink SQL query job (detached) and returns the result descriptor.
 	 */
 	ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 9cd3d8d..114d5a6 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -46,16 +46,22 @@ import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Sink;
+import org.apache.flink.table.client.config.Source;
+import org.apache.flink.table.client.config.SourceSink;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.descriptors.TableSourceDescriptor;
+import org.apache.flink.table.connectors.TableFactoryService;
+import org.apache.flink.table.connectors.TableSinkFactory;
+import org.apache.flink.table.connectors.TableSourceFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactoryService;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
@@ -80,6 +86,7 @@ public class ExecutionContext<T> {
 	private final List<URL> dependencies;
 	private final ClassLoader classLoader;
 	private final Map<String, TableSource<?>> tableSources;
+	private final Map<String, TableSink<?>> tableSinks;
 	private final Map<String, UserDefinedFunction> functions;
 	private final Configuration flinkConfig;
 	private final CommandLine commandLine;
@@ -100,14 +107,22 @@ public class ExecutionContext<T> {
 			dependencies.toArray(new URL[dependencies.size()]),
 			this.getClass().getClassLoader());
 
-		// create table sources
+		// create table sources & sinks.
 		tableSources = new HashMap<>();
+		tableSinks = new HashMap<>();
 		mergedEnv.getTables().forEach((name, descriptor) -> {
-			if (descriptor instanceof TableSourceDescriptor) {
-				final TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
-					(TableSourceDescriptor) descriptor,
-					classLoader);
-				tableSources.put(name, tableSource);
+			final DescriptorProperties properties = new DescriptorProperties(true);
+			descriptor.addProperties(properties);
+			final Map<String, String> propertyMap = properties.asMap();
+			if (descriptor instanceof Source || descriptor instanceof SourceSink) {
+				final TableSourceFactory<?> factory = (TableSourceFactory<?>)
+					TableFactoryService.find(TableSourceFactory.class, descriptor, classLoader);
+				tableSources.put(name, factory.createTableSource(propertyMap));
+			}
+			if (descriptor instanceof Sink || descriptor instanceof SourceSink) {
+				final TableSinkFactory<?> factory = (TableSinkFactory<?>)
+					TableFactoryService.find(TableSinkFactory.class, descriptor, classLoader);
+				tableSinks.put(name, factory.createTableSink(propertyMap));
 			}
 		});
 
@@ -224,6 +239,9 @@ public class ExecutionContext<T> {
 			// register table sources
 			tableSources.forEach(tableEnv::registerTableSource);
 
+			// register table sinks
+			tableSinks.forEach(tableEnv::registerTableSink);
+
 			// register user-defined functions
 			if (tableEnv instanceof StreamTableEnvironment) {
 				StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml b/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
index fb9673c..fce86b0 100644
--- a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
+++ b/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
@@ -40,7 +40,7 @@ under the License.
 		<file>
 			<source>src/test/resources/test-factory-services-file</source>
 			<outputDirectory>META-INF/services</outputDirectory>
-			<destName>org.apache.flink.table.sources.TableSourceFactory</destName>
+			<destName>org.apache.flink.table.connectors.DiscoverableTableFactory</destName>
 			<fileMode>0755</fileMode>
 		</file>
 	</files>

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
index 1b0a30e..408f255 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
@@ -24,14 +24,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.client.gateway.local.DependencyTest;
+import org.apache.flink.table.connectors.DiscoverableTableFactory;
+import org.apache.flink.table.connectors.TableSourceFactory;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactory;
 import org.apache.flink.types.Row;
 
 import java.util.ArrayList;
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
 import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
 import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
@@ -50,7 +51,7 @@ import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
 /**
  * Table source factory for testing the classloading in {@link DependencyTest}.
  */
-public class TestTableSourceFactory implements TableSourceFactory<Row> {
+public class TestTableSourceFactory implements TableSourceFactory<Row>, DiscoverableTableFactory {
 
 	@Override
 	public Map<String, String> requiredContext() {
@@ -66,18 +67,19 @@ public class TestTableSourceFactory implements TableSourceFactory<Row> {
 		properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
 		properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
 		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+		properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
 		properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
 		return properties;
 	}
 
 	@Override
-	public TableSource<Row> create(Map<String, String> properties) {
+	public TableSource<Row> createTableSource(Map<String, String> properties) {
 		final DescriptorProperties params = new DescriptorProperties(true);
 		params.putProperties(properties);
 		final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
 		final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
 		return new TestTableSource(
-			params.getTableSchema(SCHEMA()),
+			SchemaValidator.deriveTableSourceSchema(params),
 			properties.get("connector.test-property"),
 			proctime.orElse(null),
 			rowtime);

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index c7b6097..b192479 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -35,7 +35,8 @@ tables:
         type: TIMESTAMP
         rowtime:
           timestamps:
-            type: from-source
+            type: from-field
+            from: rowtimeField
           watermarks:
             type: from-source
     connector:

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..4cda0ad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.CsvTableSourceFactory
+org.apache.flink.table.sinks.CsvTableSinkFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index ff43eed..0000000
--- a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.sources.CsvTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 043a345..a239ad5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -37,9 +37,9 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, DataSetTable, RowSchema, TableSinkTable}
+import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
 
@@ -103,11 +103,40 @@ abstract class BatchTableEnvironment(
     : Unit = {
 
     tableSource match {
+
+      // check for proper batch table source
       case batchTableSource: BatchTableSource[_] =>
-        registerTableInternal(name, new BatchTableSourceTable(batchTableSource))
+        // check if a table (source or sink) is registered
+        Option(getTable(name)) match {
+
+          // table source and/or sink is registered
+          case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
+
+            // wrapper contains source
+            case Some(_: TableSourceTable[_]) =>
+              throw new TableException(s"Table '$name' already exists. " +
+                s"Please choose a different name.")
+
+            // wrapper contains only sink (not source)
+            case _ =>
+              val enrichedTable = new TableSourceSinkTable(
+                Some(new BatchTableSourceTable(batchTableSource)),
+                table.tableSinkTable)
+              replaceRegisteredTable(name, enrichedTable)
+          }
+
+          // no table is registered
+          case None =>
+            val newTable = new TableSourceSinkTable(
+              Some(new BatchTableSourceTable(batchTableSource)),
+              None)
+            registerTableInternal(name, newTable)
+        }
+
+      // not a batch table source
       case _ =>
         throw new TableException("Only BatchTableSource can be registered in " +
-            "BatchTableEnvironment")
+            "BatchTableEnvironment.")
     }
   }
 
@@ -180,7 +209,7 @@ abstract class BatchTableEnvironment(
       fieldNames: Array[String],
       fieldTypes: Array[TypeInformation[_]],
       tableSink: TableSink[_]): Unit = {
-
+    // validate
     checkValidTableName(name)
     if (fieldNames == null) throw TableException("fieldNames must not be null.")
     if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
@@ -189,10 +218,70 @@ abstract class BatchTableEnvironment(
       throw new TableException("Same number of field names and types required.")
     }
 
-    tableSink match {
-      case batchTableSink: BatchTableSink[_] =>
-        val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
-        registerTableInternal(name, new TableSinkTable(configuredSink))
+    // configure and register
+    val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+    registerTableSinkInternal(name, configuredSink)
+  }
+
+  /**
+    * Registers an external [[TableSink]] with already configured field names and field types in
+    * this [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param configuredSink The configured [[TableSink]] to register.
+    */
+  def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = {
+    registerTableSinkInternal(name, configuredSink)
+  }
+
+  private def registerTableSinkInternal(name: String, configuredSink: TableSink[_]): Unit = {
+    // validate
+    checkValidTableName(name)
+    if (configuredSink.getFieldNames == null || configuredSink.getFieldTypes == null) {
+      throw new TableException("Table sink is not configured.")
+    }
+    if (configuredSink.getFieldNames.length == 0) {
+      throw new TableException("Field names must not be empty.")
+    }
+    if (configuredSink.getFieldNames.length != configuredSink.getFieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    // register
+    configuredSink match {
+
+      // check for proper batch table sink
+      case _: BatchTableSink[_] =>
+
+        // check if a table (source or sink) is registered
+        Option(getTable(name)) match {
+
+          // table source and/or sink is registered
+          case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
+
+            // wrapper contains sink
+            case Some(_: TableSinkTable[_]) =>
+              throw new TableException(s"Table '$name' already exists. " +
+                s"Please choose a different name.")
+
+            // wrapper contains only source (not sink)
+            case _ =>
+              val enrichedTable = new TableSourceSinkTable(
+                table.tableSourceTable,
+                Some(new TableSinkTable(configuredSink)))
+              replaceRegisteredTable(name, enrichedTable)
+          }
+
+          // no table is registered
+          case _ =>
+            val newTable = new TableSourceSinkTable(
+              None,
+              Some(new TableSinkTable(configuredSink)))
+            registerTableInternal(name, newTable)
+        }
+
+      // not a batch table sink
       case _ =>
         throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 510fe0d..33b984d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -43,7 +43,7 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
+import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.conversion._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -114,6 +114,8 @@ abstract class StreamTableEnvironment(
     : Unit = {
 
     tableSource match {
+
+      // check for proper stream table source
       case streamTableSource: StreamTableSource[_] =>
         // check that event-time is enabled if table source includes rowtime attributes
         if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
@@ -122,7 +124,35 @@ abstract class StreamTableEnvironment(
               s"A rowtime attribute requires an EventTime time characteristic in stream " +
                 s"environment. But is: ${execEnv.getStreamTimeCharacteristic}")
         }
-        registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
+
+        // register
+        Option(getTable(name)) match {
+
+          // check if a table (source or sink) is registered
+          case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
+
+            // wrapper contains source
+            case Some(_: TableSourceTable[_]) =>
+              throw new TableException(s"Table '$name' already exists. " +
+                s"Please choose a different name.")
+
+            // wrapper contains only sink (not source)
+            case _ =>
+              val enrichedTable = new TableSourceSinkTable(
+                Some(new StreamTableSourceTable(streamTableSource)),
+                table.tableSinkTable)
+              replaceRegisteredTable(name, enrichedTable)
+          }
+
+          // no table is registered
+          case None =>
+            val newTable = new TableSourceSinkTable(
+              Some(new StreamTableSourceTable(streamTableSource)),
+              None)
+            registerTableInternal(name, newTable)
+        }
+
+      // not a stream table source
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in " +
           "StreamTableEnvironment")
@@ -207,14 +237,69 @@ abstract class StreamTableEnvironment(
       throw new TableException("Same number of field names and types required.")
     }
 
-    tableSink match {
-      case streamTableSink@(
-        _: AppendStreamTableSink[_] |
-        _: UpsertStreamTableSink[_] |
-        _: RetractStreamTableSink[_]) =>
+    val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+    registerTableSinkInternal(name, configuredSink)
+  }
+
+  /**
+    * Registers an external [[TableSink]] with already configured field names and field types in
+    * this [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param configuredSink The configured [[TableSink]] to register.
+    */
+  def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = {
+    registerTableSinkInternal(name, configuredSink)
+  }
+
+  private def registerTableSinkInternal(name: String, configuredSink: TableSink[_]): Unit = {
+    // validate
+    checkValidTableName(name)
+    if (configuredSink.getFieldNames == null || configuredSink.getFieldTypes == null) {
+      throw new TableException("Table sink is not configured.")
+    }
+    if (configuredSink.getFieldNames.length == 0) {
+      throw new TableException("Field names must not be empty.")
+    }
+    if (configuredSink.getFieldNames.length != configuredSink.getFieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    // register
+    configuredSink match {
+
+      // check for proper batch table sink
+      case _: StreamTableSink[_] =>
+
+        // check if a table (source or sink) is registered
+        Option(getTable(name)) match {
+
+          // table source and/or sink is registered
+          case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
+
+            // wrapper contains sink
+            case Some(_: TableSinkTable[_]) =>
+              throw new TableException(s"Table '$name' already exists. " +
+                s"Please choose a different name.")
+
+            // wrapper contains only source (not sink)
+            case _ =>
+              val enrichedTable = new TableSourceSinkTable(
+                table.tableSourceTable,
+                Some(new TableSinkTable(configuredSink)))
+              replaceRegisteredTable(name, enrichedTable)
+          }
+
+          // no table is registered
+          case _ =>
+            val newTable = new TableSourceSinkTable(
+              None,
+              Some(new TableSinkTable(configuredSink)))
+            registerTableInternal(name, newTable)
+        }
 
-        val configuredSink = streamTableSink.configure(fieldNames, fieldTypes)
-        registerTableInternal(name, new TableSinkTable(configuredSink))
+      // not a stream table sink
       case _ =>
         throw new TableException(
           "Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " +

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 88dc1e9..6a299dd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -56,7 +56,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSourceSinkTable}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -465,6 +465,16 @@ abstract class TableEnvironment(val config: TableConfig) {
       tableSink: TableSink[_]): Unit
 
   /**
+    * Registers an external [[TableSink]] with already configured field names and field types in
+    * this [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param configuredSink The configured [[TableSink]] to register.
+    */
+  def registerTableSink(name: String, configuredSink: TableSink[_]): Unit
+
+  /**
     * Replaces a registered Table with another Table under the same name.
     * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
     * with a [[org.apache.calcite.schema.TranslatableTable]].
@@ -750,8 +760,10 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
 
     getTable(sinkTableName) match {
-      case s: TableSinkTable[_] =>
-        val tableSink = s.tableSink
+
+      // check for registered table that wraps a sink
+      case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
+        val tableSink = s.tableSinkTable.get.tableSink
         // validate schema of source table and table sink
         val srcFieldTypes = table.getSchema.getTypes
         val sinkFieldTypes = tableSink.getFieldTypes
@@ -775,6 +787,7 @@ abstract class TableEnvironment(val config: TableConfig) {
               s"Query result schema: $srcSchema\n" +
               s"TableSink schema:    $sinkSchema")
         }
+
         // emit the table to the configured table sink
         writeToSink(table, tableSink, conf)
       case _ =>
@@ -821,7 +834,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     rootSchema.getTableNames.contains(name)
   }
 
-  private def getTable(name: String): org.apache.calcite.schema.Table = {
+  protected def getTable(name: String): org.apache.calcite.schema.Table = {
     rootSchema.getTable(name)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index e266a47..e311727 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -222,39 +222,35 @@ case class AmbiguousTableFormatException(
 }
 
 /**
-  * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
-  * given properties.
+  * Exception for not finding a [[org.apache.flink.table.connectors.DiscoverableTableFactory]] for
+  * the given properties.
   *
-  * @param properties properties that describe the table source
+  * @param properties properties that describe the table connector
   * @param cause the cause
   */
-case class NoMatchingTableSourceException(
-    properties: Map[String, String],
-    cause: Throwable)
-    extends RuntimeException(
-      s"Could not find a table source factory in the classpath satisfying the " +
-        s"following properties: \n" +
-        s"${DescriptorProperties.toString(properties)}",
-      cause) {
+case class NoMatchingTableFactoryException(properties: Map[String, String], cause: Throwable)
+  extends RuntimeException(
+    s"Could not find a table factory in the classpath satisfying the " +
+      s"following properties: \n" +
+      s"${DescriptorProperties.toString(properties)}",
+    cause) {
 
   def this(properties: Map[String, String]) = this(properties, null)
 }
 
 /**
-  * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for
-  * the given properties.
+  * Exception for finding more than one
+  * [[org.apache.flink.table.connectors.DiscoverableTableFactory]] for the given properties.
   *
-  * @param properties properties that describe the table source
+  * @param properties properties that describe the table factory
   * @param cause the cause
   */
-case class AmbiguousTableSourceException(
-    properties: Map[String, String],
-    cause: Throwable)
-    extends RuntimeException(
-      s"More than one table source factory in the classpath satisfying the " +
-        s"following properties: \n" +
-        s"${DescriptorProperties.toString(properties)}",
-      cause) {
+case class AmbiguousTableFactoryException(properties: Map[String, String], cause: Throwable)
+  extends RuntimeException(
+    s"More than one table factory in the classpath satisfying the " +
+      s"following properties: \n" +
+      s"${DescriptorProperties.toString(properties)}",
+    cause) {
 
   def this(properties: Map[String, String]) = this(properties, null)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index 1182fc9..7e9ac21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -19,11 +19,15 @@
 package org.apache.flink.table.catalog
 
 import org.apache.flink.table.api._
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory}
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceSinkTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSourceFactoryService}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
 import org.apache.flink.table.util.Logging
 
+import _root_.scala.collection.JavaConverters._
+
 /**
   * The utility class is used to convert ExternalCatalogTable to TableSourceTable.
   */
@@ -38,16 +42,20 @@ object ExternalTableSourceUtil extends Logging {
   def fromExternalCatalogTable(
       tableEnv: TableEnvironment,
       externalCatalogTable: ExternalCatalogTable)
-    : TableSourceTable[_] = {
-    val source = TableSourceFactoryService.findAndCreateTableSource(externalCatalogTable)
+    : TableSourceSinkTable[_, _] = {
+    val properties = new DescriptorProperties()
+    externalCatalogTable.addProperties(properties)
+    val source = TableFactoryService.find(classOf[TableSourceFactory[_]], externalCatalogTable)
+      .asInstanceOf[TableSourceFactory[_]]
+      .createTableSource(properties.asMap)
     tableEnv match {
       // check for a batch table source in this batch environment
       case _: BatchTableEnvironment =>
         source match {
           case bts: BatchTableSource[_] =>
-            new BatchTableSourceTable(
+            new TableSourceSinkTable(Some(new BatchTableSourceTable(
               bts,
-              new FlinkStatistic(externalCatalogTable.getTableStats))
+              new FlinkStatistic(externalCatalogTable.getTableStats))), None)
           case _ => throw new TableException(
             s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
               s"in a batch environment.")
@@ -56,9 +64,9 @@ object ExternalTableSourceUtil extends Logging {
       case _: StreamTableEnvironment =>
         source match {
           case sts: StreamTableSource[_] =>
-            new StreamTableSourceTable(
+            new TableSourceSinkTable(Some(new StreamTableSourceTable(
               sts,
-              new FlinkStatistic(externalCatalogTable.getTableStats))
+              new FlinkStatistic(externalCatalogTable.getTableStats))), None)
           case _ => throw new TableException(
             s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
               s"in a streaming environment.")
@@ -66,5 +74,4 @@ object ExternalTableSourceUtil extends Logging {
       case _ => throw new TableException("Unsupported table environment.")
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala
new file mode 100644
index 0000000..db5a886
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import java.util
+
+/**
+  * Common trait for all properties-based discoverable table factories.
+  */
+trait DiscoverableTableFactory {
+
+  /**
+    * Specifies the context that this factory has been implemented for.
+    *
+    * Typical properties might be:
+    *   - connector.type
+    *   - format.type
+    *
+    * Specified property versions allow the framework to provide backwards compatible properties
+    * in case of string format changes:
+    *   - connector.property-version
+    *   - format.property-version
+    *
+    * An empty context means that the factory matches for all requests.
+    */
+  def requiredContext(): util.Map[String, String]
+
+  /**
+    * List of property keys that this factory can handle. This method will be used for validation.
+    * If a property is passed that this factory cannot handle, an exception will be thrown. The
+    * list must not contain the keys that are specified by the context.
+    *
+    * Example properties might be:
+    *   - format.line-delimiter
+    *   - format.ignore-parse-errors
+    *   - format.fields.#.type
+    *   - format.fields.#.name
+    *
+    * Note: Use "#" to denote an array of values where "#" represents one or more digits. Property
+    * versions like "format.property-version" must not be part of the supported properties.
+    *
+    * In some cases it might be useful to declare wildcards "*". Wildcards can only be declared at
+    * the end of a property key.
+    *
+    * For example, if an arbitrary format should be supported:
+    *   - format.*
+    *
+    * Note: Wildcards should be used with caution as they might swallow unsupported properties
+    * and thus might lead to undesired behavior.
+    */
+  def supportedProperties(): util.List[String]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala
new file mode 100644
index 0000000..5ad6e70
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import java.util.{ServiceConfigurationError, ServiceLoader}
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.util.Logging
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+
+/**
+  * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+  */
+object TableFactoryService extends Logging {
+
+  private lazy val defaultLoader = ServiceLoader.load(classOf[DiscoverableTableFactory])
+
+  def find(clz: Class[_], descriptor: TableDescriptor): DiscoverableTableFactory = {
+    find(clz, descriptor, null)
+  }
+
+  def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
+  : DiscoverableTableFactory = {
+
+    val properties = new DescriptorProperties()
+    descriptor.addProperties(properties)
+    find(clz, properties.asMap.asScala.toMap, classLoader)
+  }
+
+  def find(clz: Class[_], properties: Map[String, String]): DiscoverableTableFactory = {
+    find(clz: Class[_], properties, null)
+  }
+
+  def find(clz: Class[_], properties: Map[String, String],
+           classLoader: ClassLoader): DiscoverableTableFactory = {
+
+    var matchingFactory: Option[(DiscoverableTableFactory, Seq[String])] = None
+    try {
+      val iter = if (classLoader == null) {
+        defaultLoader.iterator()
+      } else {
+        val customLoader = ServiceLoader.load(classOf[DiscoverableTableFactory], classLoader)
+        customLoader.iterator()
+      }
+      while (iter.hasNext) {
+        val factory = iter.next()
+
+        if (clz.isAssignableFrom(factory.getClass)) {
+          val requiredContextJava = try {
+            factory.requiredContext()
+          } catch {
+            case t: Throwable =>
+              throw new TableException(
+                s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+                t)
+          }
+
+          val requiredContext = if (requiredContextJava != null) {
+            // normalize properties
+            requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
+          } else {
+            Map[String, String]()
+          }
+
+          val plainContext = mutable.Map[String, String]()
+          plainContext ++= requiredContext
+          // we remove the versions for now until we have the first backwards compatibility case
+          // with the version we can provide mappings in case the format changes
+          plainContext.remove(CONNECTOR_PROPERTY_VERSION)
+          plainContext.remove(FORMAT_PROPERTY_VERSION)
+          plainContext.remove(METADATA_PROPERTY_VERSION)
+          plainContext.remove(STATISTICS_PROPERTY_VERSION)
+
+          if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
+            matchingFactory match {
+              case Some(_) => throw new AmbiguousTableFactoryException(properties)
+              case None => matchingFactory =
+                Some((factory.asInstanceOf[DiscoverableTableFactory], requiredContext.keys.toSeq))
+            }
+          }
+        }
+      }
+    } catch {
+      case e: ServiceConfigurationError =>
+        LOG.error("Could not load service provider for table factories.", e)
+        throw new TableException("Could not load service provider for table factories.", e)
+    }
+
+    val (factory, context) = matchingFactory
+      .getOrElse(throw new NoMatchingTableFactoryException(properties))
+
+    val plainProperties = mutable.ArrayBuffer[String]()
+    properties.keys.foreach { k =>
+      // replace arrays with wildcard
+      val key = k.replaceAll(".\\d+", ".#")
+      // ignore context properties and duplicates
+      if (!context.contains(key) && !plainProperties.contains(key)) {
+        plainProperties += key
+      }
+    }
+
+    val supportedPropertiesJava = try {
+      factory.supportedProperties()
+    } catch {
+      case t: Throwable =>
+        throw new TableException(
+          s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+          t)
+    }
+
+    val supportedProperties = if (supportedPropertiesJava != null) {
+      supportedPropertiesJava.asScala.map(_.toLowerCase)
+    } else {
+      Seq[String]()
+    }
+
+    // check for supported properties
+    plainProperties.foreach { k =>
+      if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && !supportedProperties.contains(k)) {
+        throw new ValidationException(
+          s"Table factory '${factory.getClass.getCanonicalName}' does not support the " +
+            s"property '$k'. Supported properties are: \n" +
+            s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
+      }
+    }
+
+    // create the table connector
+    try {
+      factory
+    } catch {
+      case t: Throwable =>
+        throw new TableException(
+          s"Table connector factory '${factory.getClass.getCanonicalName}' caused an exception.",
+          t)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala
new file mode 100644
index 0000000..6346e38
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import org.apache.flink.table.sinks.TableSink
+
+import java.util
+
+trait TableSinkFactory[T] {
+  /**
+    * Creates and configures a [[org.apache.flink.table.sinks.TableSink]]
+    * using the given properties.
+    *
+    * @param properties normalized properties describing a table source.
+    * @return the configured table source.
+    */
+  def createTableSink(properties: util.Map[String, String]): TableSink[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala
new file mode 100644
index 0000000..bd3130a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import org.apache.flink.table.sources.TableSource
+
+import java.util
+
+trait TableSourceFactory[T] {
+  /**
+    * Creates and configures a [[org.apache.flink.table.sources.TableSource]]
+    * using the given properties.
+    *
+    * @param properties normalized properties describing a table source.
+    * @return the configured table source.
+    */
+  def createTableSource(properties: util.Map[String, String]): TableSource[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
index afdd84c..155153f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.descriptors
 
 import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException, ValidationException}
-import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService}
+import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 
 class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
   extends TableSourceDescriptor {
@@ -43,7 +44,11 @@ class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: Con
     * Searches for the specified table source, configures it accordingly, and returns it.
     */
   def toTableSource: TableSource[_] = {
-    val source = TableSourceFactoryService.findAndCreateTableSource(this)
+    val properties = new DescriptorProperties()
+    addProperties(properties)
+    val source = TableFactoryService.find(classOf[TableSourceFactory[_]], this)
+      .asInstanceOf[TableSourceFactory[_]]
+      .createTableSource(properties.asMap)
     source match {
       case _: BatchTableSource[_] => source
       case _ => throw new TableException(