You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:58 UTC
[13/13] flink git commit: [FLINK-8866] [table] Create unified
interfaces to configure and instatiate TableSinks
[FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks
This closes #6201.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9597248a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9597248a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9597248a
Branch: refs/heads/master
Commit: 9597248a41b34e126aac6a807651b1d376dc6de1
Parents: ee40335
Author: Shuyi Chen <sh...@uber.com>
Authored: Tue Jun 19 12:00:34 2018 -0700
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 09:51:28 2018 +0200
----------------------------------------------------------------------
...nk.table.connectors.DiscoverableTableFactory | 16 ++
...pache.flink.table.sources.TableSourceFactory | 16 --
...nk.table.connectors.DiscoverableTableFactory | 16 ++
...pache.flink.table.sources.TableSourceFactory | 16 --
...nk.table.connectors.DiscoverableTableFactory | 16 ++
...pache.flink.table.sources.TableSourceFactory | 16 --
...nk.table.connectors.DiscoverableTableFactory | 16 ++
...pache.flink.table.sources.TableSourceFactory | 16 --
.../kafka/KafkaTableSourceFactory.java | 7 +-
.../KafkaJsonTableSourceFactoryTestBase.java | 10 +-
flink-libraries/flink-sql-client/pom.xml | 2 +-
.../flink/table/client/config/Environment.java | 54 ++++--
.../apache/flink/table/client/config/Sink.java | 53 ++++++
.../flink/table/client/config/Source.java | 25 +--
.../flink/table/client/config/SourceSink.java | 65 +++++++
.../flink/table/client/gateway/Executor.java | 2 +-
.../client/gateway/local/ExecutionContext.java | 34 +++-
.../test/assembly/test-table-source-factory.xml | 2 +-
.../gateway/utils/TestTableSourceFactory.java | 12 +-
.../test/resources/test-sql-client-factory.yaml | 3 +-
...nk.table.connectors.DiscoverableTableFactory | 17 ++
...pache.flink.table.sources.TableSourceFactory | 16 --
.../flink/table/api/BatchTableEnvironment.scala | 107 ++++++++++-
.../table/api/StreamTableEnvironment.scala | 103 ++++++++++-
.../flink/table/api/TableEnvironment.scala | 21 ++-
.../org/apache/flink/table/api/exceptions.scala | 40 ++--
.../table/catalog/ExternalTableSourceUtil.scala | 25 ++-
.../connectors/DiscoverableTableFactory.scala | 68 +++++++
.../table/connectors/TableFactoryService.scala | 160 ++++++++++++++++
.../table/connectors/TableSinkFactory.scala | 34 ++++
.../table/connectors/TableSourceFactory.scala | 34 ++++
.../BatchTableSourceDescriptor.scala | 9 +-
.../flink/table/descriptors/CsvValidator.scala | 1 -
.../descriptors/DescriptorProperties.scala | 3 +-
.../table/descriptors/SchemaValidator.scala | 57 +++++-
.../StreamTableSourceDescriptor.scala | 9 +-
.../descriptors/TableDescriptorValidator.scala | 5 +-
.../table/descriptors/TableSinkDescriptor.scala | 30 +++
.../descriptors/TableSourceDescriptor.scala | 2 +
.../plan/nodes/PhysicalTableSourceScan.scala | 10 +-
.../logical/FlinkLogicalTableFunctionScan.scala | 1 -
.../logical/FlinkLogicalTableSourceScan.scala | 21 ++-
.../dataSet/BatchTableSourceScanRule.scala | 17 +-
.../datastream/StreamTableSourceScanRule.scala | 17 +-
.../PushFilterIntoTableSourceScanRule.scala | 7 +-
.../plan/schema/StreamTableSourceTable.scala | 2 +-
.../table/plan/schema/TableSinkTable.scala | 14 +-
.../plan/schema/TableSourceSinkTable.scala | 67 +++++++
.../table/plan/schema/TableSourceTable.scala | 17 +-
.../flink/table/sinks/CsvTableSinkFactory.scala | 90 +++++++++
.../table/sources/CsvTableSourceFactory.scala | 5 +-
.../table/sources/TableSourceFactory.scala | 86 ---------
.../sources/TableSourceFactoryService.scala | 181 -------------------
...nk.table.connectors.DiscoverableTableFactory | 19 ++
...pache.flink.table.sources.TableSourceFactory | 17 --
.../validation/InsertIntoValidationTest.scala | 8 +-
.../validation/InsertIntoValidationTest.scala | 6 +-
.../validation/InsertIntoValidationTest.scala | 8 +-
.../validation/InsertIntoValidationTest.scala | 6 +-
.../validation/TableSinksValidationTest.scala | 2 +-
.../catalog/ExternalCatalogSchemaTest.scala | 8 +-
.../TableSinkFactoryServiceTest.scala | 85 +++++++++
.../TableSourceFactoryServiceTest.scala | 110 +++++++++++
.../TestFixedFormatTableFactory.scala | 63 +++++++
.../table/connectors/TestTableSinkFactory.scala | 75 ++++++++
.../connectors/TestTableSourceFactory.scala | 64 +++++++
.../TestWildcardFormatTableSourceFactory.scala | 53 ++++++
.../table/descriptors/SchemaValidatorTest.scala | 36 +++-
.../descriptors/TableSourceDescriptorTest.scala | 3 +
.../batch/sql/TableEnvironmentITCase.scala | 8 +-
.../batch/table/TableEnvironmentITCase.scala | 8 +-
.../runtime/stream/TimeAttributesITCase.scala | 8 +-
.../table/runtime/stream/sql/SqlITCase.scala | 48 ++++-
.../runtime/stream/table/TableSinkITCase.scala | 8 +-
.../sources/TableSourceFactoryServiceTest.scala | 100 ----------
.../sources/TestFixedFormatTableFactory.scala | 62 -------
.../TestWildcardFormatTableSourceFactory.scala | 56 ------
.../table/utils/InMemoryTableFactory.scala | 109 +++++++++++
.../flink/table/utils/MemoryTableSinkUtil.scala | 89 ---------
.../table/utils/MemoryTableSourceSinkUtil.scala | 151 ++++++++++++++++
.../table/utils/MockTableEnvironment.scala | 2 +
81 files changed, 1994 insertions(+), 891 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..21f5707
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index 21f5707..0000000
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..c056097
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index c056097..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..b83bb3f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index b83bb3f..0000000
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka08TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..fb14ddb
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index fb14ddb..0000000
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
index d88fc6f..563cd40 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connectors.DiscoverableTableFactory;
+import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.descriptors.SchemaValidator;
@@ -30,7 +32,6 @@ import org.apache.flink.table.formats.DeserializationSchemaFactory;
import org.apache.flink.table.formats.TableFormatFactoryService;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactory;
import org.apache.flink.types.Row;
import java.util.ArrayList;
@@ -71,7 +72,7 @@ import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
/**
* Factory for creating configured instances of {@link KafkaTableSource}.
*/
-public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row> {
+public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>, DiscoverableTableFactory {
@Override
public Map<String, String> requiredContext() {
@@ -118,7 +119,7 @@ public abstract class KafkaTableSourceFactory implements TableSourceFactory<Row>
}
@Override
- public TableSource<Row> create(Map<String, String> properties) {
+ public TableSource<Row> createTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 84c5bd3..c8b422f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connectors.TableFactoryService;
+import org.apache.flink.table.connectors.TableSourceFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
@@ -29,7 +32,6 @@ import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactoryService;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
@@ -148,7 +150,11 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
new Rowtime().timestampsFromField("time").watermarksPeriodicAscending())
.field("proc-time", Types.SQL_TIMESTAMP).proctime());
- final TableSource<?> factorySource = TableSourceFactoryService.findAndCreateTableSource(testDesc);
+ DescriptorProperties properties = new DescriptorProperties(true);
+ testDesc.addProperties(properties);
+ final TableSource<?> factorySource =
+ ((TableSourceFactory) TableFactoryService.find(TableSourceFactory.class, testDesc))
+ .createTableSource(properties.asMap());
assertEquals(builderSource, factorySource);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index 6bcfc13..b413b6f 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -159,7 +159,7 @@ under the License.
<include>org.codehaus.commons.compiler.properties</include>
<include>org/codehaus/janino/**</include>
<include>org/codehaus/commons/**</include>
- <include>META-INF/services/org.apache.flink.table.sources.TableSourceFactory</include>
+ <include>META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory</include>
<!-- flink-sql-client -->
<include>org/jline/**</include>
<include>com/fasterxml/jackson/**</include>
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index b26c45f..966a581 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -48,6 +48,8 @@ public class Environment {
private Deployment deployment;
+ private static final String NAME = "name";
+
public Environment() {
this.tables = Collections.emptyMap();
this.functions = Collections.emptyMap();
@@ -62,20 +64,21 @@ public class Environment {
public void setTables(List<Map<String, Object>> tables) {
this.tables = new HashMap<>(tables.size());
tables.forEach(config -> {
- if (!config.containsKey(TableDescriptorValidator.TABLE_TYPE())) {
- throw new SqlClientException("The 'type' attribute of a table is missing.");
+ if (!config.containsKey(NAME)) {
+ throw new SqlClientException("The 'name' attribute of a table is missing.");
+ }
+ final Object nameObject = config.get(NAME);
+ if (nameObject == null || !(nameObject instanceof String) || ((String) nameObject).length() <= 0) {
+ throw new SqlClientException("Invalid table name '" + nameObject + "'.");
}
- if (config.get(TableDescriptorValidator.TABLE_TYPE()).equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
- config.remove(TableDescriptorValidator.TABLE_TYPE());
- final Source s = Source.create(config);
- if (this.tables.containsKey(s.getName())) {
- throw new SqlClientException("Duplicate source name '" + s.getName() + "'.");
- }
- this.tables.put(s.getName(), s);
- } else {
- throw new SqlClientException(
- "Invalid table 'type' attribute value, only 'source' is supported");
+ final String name = (String) nameObject;
+ final Map<String, Object> properties = new HashMap<>(config);
+ properties.remove(NAME);
+
+ if (this.tables.containsKey(name)) {
+ throw new SqlClientException("Duplicate table name '" + name + "'.");
}
+ this.tables.put(name, createTableDescriptor(name, properties));
});
}
@@ -195,4 +198,31 @@ public class Environment {
return enrichedEnv;
}
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a table descriptor from a YAML config map.
+ *
+ * @param name name of the table
+ * @param config YAML config map
+ * @return table descriptor describing a source, sink, or both
+ */
+ private static TableDescriptor createTableDescriptor(String name, Map<String, Object> config) {
+ final Object typeObject = config.get(TableDescriptorValidator.TABLE_TYPE());
+ if (typeObject == null || !(typeObject instanceof String)) {
+ throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'.");
+ }
+ final String type = (String) config.get(TableDescriptorValidator.TABLE_TYPE());
+ final Map<String, String> normalizedConfig = ConfigUtil.normalizeYaml(config);
+ if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE())) {
+ return new Source(name, normalizedConfig);
+ } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SINK())) {
+ return new Sink(name, normalizedConfig);
+ } else if (type.equals(TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE_SINK())) {
+ return new SourceSink(name, normalizedConfig);
+ }
+ throw new SqlClientException("Invalid 'type' attribute for table '" + name + "'. " +
+ "Only 'source', 'sink', and 'both' are supported.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
new file mode 100644
index 0000000..0de65fb
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Sink.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.TableSinkDescriptor;
+
+import java.util.Map;
+
+/**
+ * Configuration of a table sink.
+ */
+public class Sink extends TableSinkDescriptor {
+
+ private String name;
+ private Map<String, String> properties;
+
+ protected Sink(String name, Map<String, String> properties) {
+ this.name = name;
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void addProperties(DescriptorProperties properties) {
+ this.properties.forEach(properties::putString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
index 2bef257..ef80596 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Source.java
@@ -18,25 +18,20 @@
package org.apache.flink.table.client.config;
-import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableSourceDescriptor;
-import java.util.HashMap;
import java.util.Map;
/**
- * Configuration of a table source. Parses an entry in the `tables` list of an environment
- * file and translates to table descriptor properties.
+ * Configuration of a table source.
*/
public class Source extends TableSourceDescriptor {
private String name;
private Map<String, String> properties;
- private static final String NAME = "name";
-
- private Source(String name, Map<String, String> properties) {
+ protected Source(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
}
@@ -49,22 +44,6 @@ public class Source extends TableSourceDescriptor {
return properties;
}
- /**
- * Creates a table source descriptor with the given config.
- */
- public static Source create(Map<String, Object> config) {
- if (!config.containsKey(NAME)) {
- throw new SqlClientException("The 'name' attribute of a table source is missing.");
- }
- final Object name = config.get(NAME);
- if (name == null || !(name instanceof String) || ((String) name).length() <= 0) {
- throw new SqlClientException("Invalid table source name '" + name + "'.");
- }
- final Map<String, Object> properties = new HashMap<>(config);
- properties.remove(NAME);
- return new Source((String) name, ConfigUtil.normalizeYaml(properties));
- }
-
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
new file mode 100644
index 0000000..33fb0f1
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SourceSink.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.TableDescriptor;
+
+import java.util.Map;
+
+/**
+ * Common class for all descriptors describing a table source and sink together.
+ */
+public class SourceSink extends TableDescriptor {
+
+ private String name;
+ private Map<String, String> properties;
+
+ protected SourceSink(String name, Map<String, String> properties) {
+ this.name = name;
+ this.properties = properties;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void addProperties(DescriptorProperties properties) {
+ this.properties.forEach(properties::putString);
+ }
+
+ public Source toSource() {
+ final Map<String, String> newProperties = new HashMap<>(properties);
+ newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
+ TableDescriptorValidator.TABLE_TYPE_VALUE_SOURCE());
+ return new Source(name, newProperties);
+ }
+
+ public Sink toSink() {
+ final Map<String, String> newProperties = new HashMap<>(properties);
+ newProperties.replace(TableDescriptorValidator.TABLE_TYPE(),
+ TableDescriptorValidator.TABLE_TYPE_VALUE_SINK());
+ return new Sink(name, newProperties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index f12ff4d..9ace240 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -62,7 +62,7 @@ public interface Executor {
String explainStatement(SessionContext session, String statement) throws SqlExecutionException;
/**
- * Submits a Flink job (detached) and returns the result descriptor.
+ * Submits a Flink SQL query job (detached) and returns the result descriptor.
*/
ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException;
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 9cd3d8d..114d5a6 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -46,16 +46,22 @@ import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.client.config.Deployment;
import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.Sink;
+import org.apache.flink.table.client.config.Source;
+import org.apache.flink.table.client.config.SourceSink;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.descriptors.TableSourceDescriptor;
+import org.apache.flink.table.connectors.TableFactoryService;
+import org.apache.flink.table.connectors.TableSinkFactory;
+import org.apache.flink.table.connectors.TableSourceFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactoryService;
import org.apache.flink.util.FlinkException;
import org.apache.commons.cli.CommandLine;
@@ -80,6 +86,7 @@ public class ExecutionContext<T> {
private final List<URL> dependencies;
private final ClassLoader classLoader;
private final Map<String, TableSource<?>> tableSources;
+ private final Map<String, TableSink<?>> tableSinks;
private final Map<String, UserDefinedFunction> functions;
private final Configuration flinkConfig;
private final CommandLine commandLine;
@@ -100,14 +107,22 @@ public class ExecutionContext<T> {
dependencies.toArray(new URL[dependencies.size()]),
this.getClass().getClassLoader());
- // create table sources
+ // create table sources & sinks.
tableSources = new HashMap<>();
+ tableSinks = new HashMap<>();
mergedEnv.getTables().forEach((name, descriptor) -> {
- if (descriptor instanceof TableSourceDescriptor) {
- final TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(
- (TableSourceDescriptor) descriptor,
- classLoader);
- tableSources.put(name, tableSource);
+ final DescriptorProperties properties = new DescriptorProperties(true);
+ descriptor.addProperties(properties);
+ final Map<String, String> propertyMap = properties.asMap();
+ if (descriptor instanceof Source || descriptor instanceof SourceSink) {
+ final TableSourceFactory<?> factory = (TableSourceFactory<?>)
+ TableFactoryService.find(TableSourceFactory.class, descriptor, classLoader);
+ tableSources.put(name, factory.createTableSource(propertyMap));
+ }
+ if (descriptor instanceof Sink || descriptor instanceof SourceSink) {
+ final TableSinkFactory<?> factory = (TableSinkFactory<?>)
+ TableFactoryService.find(TableSinkFactory.class, descriptor, classLoader);
+ tableSinks.put(name, factory.createTableSink(propertyMap));
}
});
@@ -224,6 +239,9 @@ public class ExecutionContext<T> {
// register table sources
tableSources.forEach(tableEnv::registerTableSource);
+ // register table sinks
+ tableSinks.forEach(tableEnv::registerTableSink);
+
// register user-defined functions
if (tableEnv instanceof StreamTableEnvironment) {
StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) tableEnv;
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml b/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
index fb9673c..fce86b0 100644
--- a/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
+++ b/flink-libraries/flink-sql-client/src/test/assembly/test-table-source-factory.xml
@@ -40,7 +40,7 @@ under the License.
<file>
<source>src/test/resources/test-factory-services-file</source>
<outputDirectory>META-INF/services</outputDirectory>
- <destName>org.apache.flink.table.sources.TableSourceFactory</destName>
+ <destName>org.apache.flink.table.connectors.DiscoverableTableFactory</destName>
<fileMode>0755</fileMode>
</file>
</files>
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
index 1b0a30e..408f255 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactory.java
@@ -24,14 +24,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.client.gateway.local.DependencyTest;
+import org.apache.flink.table.connectors.DiscoverableTableFactory;
+import org.apache.flink.table.connectors.TableSourceFactory;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.table.sources.TableSourceFactory;
import org.apache.flink.types.Row;
import java.util.ArrayList;
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
@@ -50,7 +51,7 @@ import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
/**
* Table source factory for testing the classloading in {@link DependencyTest}.
*/
-public class TestTableSourceFactory implements TableSourceFactory<Row> {
+public class TestTableSourceFactory implements TableSourceFactory<Row>, DiscoverableTableFactory {
@Override
public Map<String, String> requiredContext() {
@@ -66,18 +67,19 @@ public class TestTableSourceFactory implements TableSourceFactory<Row> {
properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+ properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
return properties;
}
@Override
- public TableSource<Row> create(Map<String, String> properties) {
+ public TableSource<Row> createTableSource(Map<String, String> properties) {
final DescriptorProperties params = new DescriptorProperties(true);
params.putProperties(properties);
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
- params.getTableSchema(SCHEMA()),
+ SchemaValidator.deriveTableSourceSchema(params),
properties.get("connector.test-property"),
proctime.orElse(null),
rowtime);
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index c7b6097..b192479 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -35,7 +35,8 @@ tables:
type: TIMESTAMP
rowtime:
timestamps:
- type: from-source
+ type: from-field
+ from: rowtimeField
watermarks:
type: from-source
connector:
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
new file mode 100644
index 0000000..4cda0ad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.connectors.DiscoverableTableFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.sources.CsvTableSourceFactory
+org.apache.flink.table.sinks.CsvTableSinkFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
deleted file mode 100644
index ff43eed..0000000
--- a/flink-libraries/flink-table/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.table.sources.CsvTableSourceFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 043a345..a239ad5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -37,9 +37,9 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, DataSetTable, RowSchema, TableSinkTable}
+import org.apache.flink.table.plan.schema._
import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
import org.apache.flink.types.Row
@@ -103,11 +103,40 @@ abstract class BatchTableEnvironment(
: Unit = {
tableSource match {
+
+ // check for proper batch table source
case batchTableSource: BatchTableSource[_] =>
- registerTableInternal(name, new BatchTableSourceTable(batchTableSource))
+ // check if a table (source or sink) is registered
+ Option(getTable(name)) match {
+
+ // table source and/or sink is registered
+ case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
+
+ // wrapper contains source
+ case Some(_: TableSourceTable[_]) =>
+ throw new TableException(s"Table '$name' already exists. " +
+ s"Please choose a different name.")
+
+ // wrapper contains only sink (not source)
+ case _ =>
+ val enrichedTable = new TableSourceSinkTable(
+ Some(new BatchTableSourceTable(batchTableSource)),
+ table.tableSinkTable)
+ replaceRegisteredTable(name, enrichedTable)
+ }
+
+ // no table is registered
+ case None =>
+ val newTable = new TableSourceSinkTable(
+ Some(new BatchTableSourceTable(batchTableSource)),
+ None)
+ registerTableInternal(name, newTable)
+ }
+
+ // not a batch table source
case _ =>
throw new TableException("Only BatchTableSource can be registered in " +
- "BatchTableEnvironment")
+ "BatchTableEnvironment.")
}
}
@@ -180,7 +209,7 @@ abstract class BatchTableEnvironment(
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
tableSink: TableSink[_]): Unit = {
-
+ // validate
checkValidTableName(name)
if (fieldNames == null) throw TableException("fieldNames must not be null.")
if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
@@ -189,10 +218,70 @@ abstract class BatchTableEnvironment(
throw new TableException("Same number of field names and types required.")
}
- tableSink match {
- case batchTableSink: BatchTableSink[_] =>
- val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
- registerTableInternal(name, new TableSinkTable(configuredSink))
+ // configure and register
+ val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+ registerTableSinkInternal(name, configuredSink)
+ }
+
+ /**
+ * Registers an external [[TableSink]] with already configured field names and field types in
+ * this [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param configuredSink The configured [[TableSink]] to register.
+ */
+ def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = {
+ registerTableSinkInternal(name, configuredSink)
+ }
+
+ private def registerTableSinkInternal(name: String, configuredSink: TableSink[_]): Unit = {
+ // validate
+ checkValidTableName(name)
+ if (configuredSink.getFieldNames == null || configuredSink.getFieldTypes == null) {
+ throw new TableException("Table sink is not configured.")
+ }
+ if (configuredSink.getFieldNames.length == 0) {
+ throw new TableException("Field names must not be empty.")
+ }
+ if (configuredSink.getFieldNames.length != configuredSink.getFieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ // register
+ configuredSink match {
+
+ // check for proper batch table sink
+ case _: BatchTableSink[_] =>
+
+ // check if a table (source or sink) is registered
+ Option(getTable(name)) match {
+
+ // table source and/or sink is registered
+ case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
+
+ // wrapper contains sink
+ case Some(_: TableSinkTable[_]) =>
+ throw new TableException(s"Table '$name' already exists. " +
+ s"Please choose a different name.")
+
+ // wrapper contains only source (not sink)
+ case _ =>
+ val enrichedTable = new TableSourceSinkTable(
+ table.tableSourceTable,
+ Some(new TableSinkTable(configuredSink)))
+ replaceRegisteredTable(name, enrichedTable)
+ }
+
+ // no table is registered
+ case _ =>
+ val newTable = new TableSourceSinkTable(
+ None,
+ Some(new TableSinkTable(configuredSink)))
+ registerTableInternal(name, newTable)
+ }
+
+ // not a batch table sink
case _ =>
throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 510fe0d..33b984d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -43,7 +43,7 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
+import org.apache.flink.table.plan.schema._
import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.conversion._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -114,6 +114,8 @@ abstract class StreamTableEnvironment(
: Unit = {
tableSource match {
+
+ // check for proper stream table source
case streamTableSource: StreamTableSource[_] =>
// check that event-time is enabled if table source includes rowtime attributes
if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
@@ -122,7 +124,35 @@ abstract class StreamTableEnvironment(
s"A rowtime attribute requires an EventTime time characteristic in stream " +
s"environment. But is: ${execEnv.getStreamTimeCharacteristic}")
}
- registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
+
+ // register
+ Option(getTable(name)) match {
+
+ // check if a table (source or sink) is registered
+ case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
+
+ // wrapper contains source
+ case Some(_: TableSourceTable[_]) =>
+ throw new TableException(s"Table '$name' already exists. " +
+ s"Please choose a different name.")
+
+ // wrapper contains only sink (not source)
+ case _ =>
+ val enrichedTable = new TableSourceSinkTable(
+ Some(new StreamTableSourceTable(streamTableSource)),
+ table.tableSinkTable)
+ replaceRegisteredTable(name, enrichedTable)
+ }
+
+ // no table is registered
+ case None =>
+ val newTable = new TableSourceSinkTable(
+ Some(new StreamTableSourceTable(streamTableSource)),
+ None)
+ registerTableInternal(name, newTable)
+ }
+
+ // not a stream table source
case _ =>
throw new TableException("Only StreamTableSource can be registered in " +
"StreamTableEnvironment")
@@ -207,14 +237,69 @@ abstract class StreamTableEnvironment(
throw new TableException("Same number of field names and types required.")
}
- tableSink match {
- case streamTableSink@(
- _: AppendStreamTableSink[_] |
- _: UpsertStreamTableSink[_] |
- _: RetractStreamTableSink[_]) =>
+ val configuredSink = tableSink.configure(fieldNames, fieldTypes)
+ registerTableSinkInternal(name, configuredSink)
+ }
+
+ /**
+ * Registers an external [[TableSink]] with already configured field names and field types in
+ * this [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param configuredSink The configured [[TableSink]] to register.
+ */
+ def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = {
+ registerTableSinkInternal(name, configuredSink)
+ }
+
+ private def registerTableSinkInternal(name: String, configuredSink: TableSink[_]): Unit = {
+ // validate
+ checkValidTableName(name)
+ if (configuredSink.getFieldNames == null || configuredSink.getFieldTypes == null) {
+ throw new TableException("Table sink is not configured.")
+ }
+ if (configuredSink.getFieldNames.length == 0) {
+ throw new TableException("Field names must not be empty.")
+ }
+ if (configuredSink.getFieldNames.length != configuredSink.getFieldTypes.length) {
+ throw new TableException("Same number of field names and types required.")
+ }
+
+ // register
+ configuredSink match {
+
+ // check for proper batch table sink
+ case _: StreamTableSink[_] =>
+
+ // check if a table (source or sink) is registered
+ Option(getTable(name)) match {
+
+ // table source and/or sink is registered
+ case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
+
+ // wrapper contains sink
+ case Some(_: TableSinkTable[_]) =>
+ throw new TableException(s"Table '$name' already exists. " +
+ s"Please choose a different name.")
+
+ // wrapper contains only source (not sink)
+ case _ =>
+ val enrichedTable = new TableSourceSinkTable(
+ table.tableSourceTable,
+ Some(new TableSinkTable(configuredSink)))
+ replaceRegisteredTable(name, enrichedTable)
+ }
+
+ // no table is registered
+ case _ =>
+ val newTable = new TableSourceSinkTable(
+ None,
+ Some(new TableSinkTable(configuredSink)))
+ registerTableInternal(name, newTable)
+ }
- val configuredSink = streamTableSink.configure(fieldNames, fieldTypes)
- registerTableInternal(name, new TableSinkTable(configuredSink))
+ // not a stream table sink
case _ =>
throw new TableException(
"Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " +
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 88dc1e9..6a299dd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -56,7 +56,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSourceSinkTable}
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -465,6 +465,16 @@ abstract class TableEnvironment(val config: TableConfig) {
tableSink: TableSink[_]): Unit
/**
+ * Registers an external [[TableSink]] with already configured field names and field types in
+ * this [[TableEnvironment]]'s catalog.
+ * Registered sink tables can be referenced in SQL DML statements.
+ *
+ * @param name The name under which the [[TableSink]] is registered.
+ * @param configuredSink The configured [[TableSink]] to register.
+ */
+ def registerTableSink(name: String, configuredSink: TableSink[_]): Unit
+
+ /**
* Replaces a registered Table with another Table under the same name.
* We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
* with a [[org.apache.calcite.schema.TranslatableTable]].
@@ -750,8 +760,10 @@ abstract class TableEnvironment(val config: TableConfig) {
}
getTable(sinkTableName) match {
- case s: TableSinkTable[_] =>
- val tableSink = s.tableSink
+
+ // check for registered table that wraps a sink
+ case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
+ val tableSink = s.tableSinkTable.get.tableSink
// validate schema of source table and table sink
val srcFieldTypes = table.getSchema.getTypes
val sinkFieldTypes = tableSink.getFieldTypes
@@ -775,6 +787,7 @@ abstract class TableEnvironment(val config: TableConfig) {
s"Query result schema: $srcSchema\n" +
s"TableSink schema: $sinkSchema")
}
+
// emit the table to the configured table sink
writeToSink(table, tableSink, conf)
case _ =>
@@ -821,7 +834,7 @@ abstract class TableEnvironment(val config: TableConfig) {
rootSchema.getTableNames.contains(name)
}
- private def getTable(name: String): org.apache.calcite.schema.Table = {
+ protected def getTable(name: String): org.apache.calcite.schema.Table = {
rootSchema.getTable(name)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index e266a47..e311727 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -222,39 +222,35 @@ case class AmbiguousTableFormatException(
}
/**
- * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
- * given properties.
+ * Exception for not finding a [[org.apache.flink.table.connectors.DiscoverableTableFactory]] for
+ * the given properties.
*
- * @param properties properties that describe the table source
+ * @param properties properties that describe the table connector
* @param cause the cause
*/
-case class NoMatchingTableSourceException(
- properties: Map[String, String],
- cause: Throwable)
- extends RuntimeException(
- s"Could not find a table source factory in the classpath satisfying the " +
- s"following properties: \n" +
- s"${DescriptorProperties.toString(properties)}",
- cause) {
+case class NoMatchingTableFactoryException(properties: Map[String, String], cause: Throwable)
+ extends RuntimeException(
+ s"Could not find a table factory in the classpath satisfying the " +
+ s"following properties: \n" +
+ s"${DescriptorProperties.toString(properties)}",
+ cause) {
def this(properties: Map[String, String]) = this(properties, null)
}
/**
- * Exception for finding more than one [[org.apache.flink.table.sources.TableSourceFactory]] for
- * the given properties.
+ * Exception for finding more than one
+ * [[org.apache.flink.table.connectors.DiscoverableTableFactory]] for the given properties.
*
- * @param properties properties that describe the table source
+ * @param properties properties that describe the table factory
* @param cause the cause
*/
-case class AmbiguousTableSourceException(
- properties: Map[String, String],
- cause: Throwable)
- extends RuntimeException(
- s"More than one table source factory in the classpath satisfying the " +
- s"following properties: \n" +
- s"${DescriptorProperties.toString(properties)}",
- cause) {
+case class AmbiguousTableFactoryException(properties: Map[String, String], cause: Throwable)
+ extends RuntimeException(
+ s"More than one table factory in the classpath satisfying the " +
+ s"following properties: \n" +
+ s"${DescriptorProperties.toString(properties)}",
+ cause) {
def this(properties: Map[String, String]) = this(properties, null)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index 1182fc9..7e9ac21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -19,11 +19,15 @@
package org.apache.flink.table.catalog
import org.apache.flink.table.api._
-import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory}
+import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceSinkTable, TableSourceTable}
import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSourceFactoryService}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
import org.apache.flink.table.util.Logging
+import _root_.scala.collection.JavaConverters._
+
/**
* The utility class is used to convert ExternalCatalogTable to TableSourceTable.
*/
@@ -38,16 +42,20 @@ object ExternalTableSourceUtil extends Logging {
def fromExternalCatalogTable(
tableEnv: TableEnvironment,
externalCatalogTable: ExternalCatalogTable)
- : TableSourceTable[_] = {
- val source = TableSourceFactoryService.findAndCreateTableSource(externalCatalogTable)
+ : TableSourceSinkTable[_, _] = {
+ val properties = new DescriptorProperties()
+ externalCatalogTable.addProperties(properties)
+ val source = TableFactoryService.find(classOf[TableSourceFactory[_]], externalCatalogTable)
+ .asInstanceOf[TableSourceFactory[_]]
+ .createTableSource(properties.asMap)
tableEnv match {
// check for a batch table source in this batch environment
case _: BatchTableEnvironment =>
source match {
case bts: BatchTableSource[_] =>
- new BatchTableSourceTable(
+ new TableSourceSinkTable(Some(new BatchTableSourceTable(
bts,
- new FlinkStatistic(externalCatalogTable.getTableStats))
+ new FlinkStatistic(externalCatalogTable.getTableStats))), None)
case _ => throw new TableException(
s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
s"in a batch environment.")
@@ -56,9 +64,9 @@ object ExternalTableSourceUtil extends Logging {
case _: StreamTableEnvironment =>
source match {
case sts: StreamTableSource[_] =>
- new StreamTableSourceTable(
+ new TableSourceSinkTable(Some(new StreamTableSourceTable(
sts,
- new FlinkStatistic(externalCatalogTable.getTableStats))
+ new FlinkStatistic(externalCatalogTable.getTableStats))), None)
case _ => throw new TableException(
s"Found table source '${source.getClass.getCanonicalName}' is not applicable " +
s"in a streaming environment.")
@@ -66,5 +74,4 @@ object ExternalTableSourceUtil extends Logging {
case _ => throw new TableException("Unsupported table environment.")
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala
new file mode 100644
index 0000000..db5a886
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/DiscoverableTableFactory.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import java.util
+
+/**
+ * Common trait for all properties-based discoverable table factories.
+ */
+trait DiscoverableTableFactory {
+
+ /**
+ * Specifies the context that this factory has been implemented for.
+ *
+ * Typical properties might be:
+ * - connector.type
+ * - format.type
+ *
+ * Specified property versions allow the framework to provide backwards compatible properties
+ * in case of string format changes:
+ * - connector.property-version
+ * - format.property-version
+ *
+ * An empty context means that the factory matches for all requests.
+ */
+ def requiredContext(): util.Map[String, String]
+
+ /**
+ * List of property keys that this factory can handle. This method will be used for validation.
+ * If a property is passed that this factory cannot handle, an exception will be thrown. The
+ * list must not contain the keys that are specified by the context.
+ *
+ * Example properties might be:
+ * - format.line-delimiter
+ * - format.ignore-parse-errors
+ * - format.fields.#.type
+ * - format.fields.#.name
+ *
+ * Note: Use "#" to denote an array of values where "#" represents one or more digits. Property
+ * versions like "format.property-version" must not be part of the supported properties.
+ *
+ * In some cases it might be useful to declare wildcards "*". Wildcards can only be declared at
+ * the end of a property key.
+ *
+ * For example, if an arbitrary format should be supported:
+ * - format.*
+ *
+ * Note: Wildcards should be used with caution as they might swallow unsupported properties
+ * and thus might lead to undesired behavior.
+ */
+ def supportedProperties(): util.List[String]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala
new file mode 100644
index 0000000..5ad6e70
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableFactoryService.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import java.util.{ServiceConfigurationError, ServiceLoader}
+
+import org.apache.flink.table.api._
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._
+import org.apache.flink.table.descriptors.FormatDescriptorValidator._
+import org.apache.flink.table.descriptors.MetadataValidator._
+import org.apache.flink.table.descriptors.StatisticsValidator._
+import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator}
+import org.apache.flink.table.util.Logging
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+
+/**
+ * Unified interface to search for TableFactoryDiscoverable of provided type and properties.
+ */
+object TableFactoryService extends Logging {
+
+ private lazy val defaultLoader = ServiceLoader.load(classOf[DiscoverableTableFactory])
+
+ def find(clz: Class[_], descriptor: TableDescriptor): DiscoverableTableFactory = {
+ find(clz, descriptor, null)
+ }
+
+ def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader)
+ : DiscoverableTableFactory = {
+
+ val properties = new DescriptorProperties()
+ descriptor.addProperties(properties)
+ find(clz, properties.asMap.asScala.toMap, classLoader)
+ }
+
+ def find(clz: Class[_], properties: Map[String, String]): DiscoverableTableFactory = {
+ find(clz: Class[_], properties, null)
+ }
+
+ def find(clz: Class[_], properties: Map[String, String],
+ classLoader: ClassLoader): DiscoverableTableFactory = {
+
+ var matchingFactory: Option[(DiscoverableTableFactory, Seq[String])] = None
+ try {
+ val iter = if (classLoader == null) {
+ defaultLoader.iterator()
+ } else {
+ val customLoader = ServiceLoader.load(classOf[DiscoverableTableFactory], classLoader)
+ customLoader.iterator()
+ }
+ while (iter.hasNext) {
+ val factory = iter.next()
+
+ if (clz.isAssignableFrom(factory.getClass)) {
+ val requiredContextJava = try {
+ factory.requiredContext()
+ } catch {
+ case t: Throwable =>
+ throw new TableException(
+ s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+ t)
+ }
+
+ val requiredContext = if (requiredContextJava != null) {
+ // normalize properties
+ requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2))
+ } else {
+ Map[String, String]()
+ }
+
+ val plainContext = mutable.Map[String, String]()
+ plainContext ++= requiredContext
+ // we remove the versions for now until we have the first backwards compatibility case
+ // with the version we can provide mappings in case the format changes
+ plainContext.remove(CONNECTOR_PROPERTY_VERSION)
+ plainContext.remove(FORMAT_PROPERTY_VERSION)
+ plainContext.remove(METADATA_PROPERTY_VERSION)
+ plainContext.remove(STATISTICS_PROPERTY_VERSION)
+
+ if (plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)) {
+ matchingFactory match {
+ case Some(_) => throw new AmbiguousTableFactoryException(properties)
+ case None => matchingFactory =
+ Some((factory.asInstanceOf[DiscoverableTableFactory], requiredContext.keys.toSeq))
+ }
+ }
+ }
+ }
+ } catch {
+ case e: ServiceConfigurationError =>
+ LOG.error("Could not load service provider for table factories.", e)
+ throw new TableException("Could not load service provider for table factories.", e)
+ }
+
+ val (factory, context) = matchingFactory
+ .getOrElse(throw new NoMatchingTableFactoryException(properties))
+
+ val plainProperties = mutable.ArrayBuffer[String]()
+ properties.keys.foreach { k =>
+ // replace arrays with wildcard
+ val key = k.replaceAll(".\\d+", ".#")
+ // ignore context properties and duplicates
+ if (!context.contains(key) && !plainProperties.contains(key)) {
+ plainProperties += key
+ }
+ }
+
+ val supportedPropertiesJava = try {
+ factory.supportedProperties()
+ } catch {
+ case t: Throwable =>
+ throw new TableException(
+ s"Table source factory '${factory.getClass.getCanonicalName}' caused an exception.",
+ t)
+ }
+
+ val supportedProperties = if (supportedPropertiesJava != null) {
+ supportedPropertiesJava.asScala.map(_.toLowerCase)
+ } else {
+ Seq[String]()
+ }
+
+ // check for supported properties
+ plainProperties.foreach { k =>
+ if (!k.equals(TableDescriptorValidator.TABLE_TYPE) && !supportedProperties.contains(k)) {
+ throw new ValidationException(
+ s"Table factory '${factory.getClass.getCanonicalName}' does not support the " +
+ s"property '$k'. Supported properties are: \n" +
+ s"${supportedProperties.map(DescriptorProperties.toString).mkString("\n")}")
+ }
+ }
+
+ // create the table connector
+ try {
+ factory
+ } catch {
+ case t: Throwable =>
+ throw new TableException(
+ s"Table connector factory '${factory.getClass.getCanonicalName}' caused an exception.",
+ t)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala
new file mode 100644
index 0000000..6346e38
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSinkFactory.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import org.apache.flink.table.sinks.TableSink
+
+import java.util
+
+trait TableSinkFactory[T] {
+ /**
+ * Creates and configures a [[org.apache.flink.table.sinks.TableSink]]
+ * using the given properties.
+ *
+ * @param properties normalized properties describing a table source.
+ * @return the configured table source.
+ */
+ def createTableSink(properties: util.Map[String, String]): TableSink[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala
new file mode 100644
index 0000000..bd3130a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/connectors/TableSourceFactory.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connectors
+
+import org.apache.flink.table.sources.TableSource
+
+import java.util
+
+trait TableSourceFactory[T] {
+ /**
+ * Creates and configures a [[org.apache.flink.table.sources.TableSource]]
+ * using the given properties.
+ *
+ * @param properties normalized properties describing a table source.
+ * @return the configured table source.
+ */
+ def createTableSource(properties: util.Map[String, String]): TableSource[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9597248a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
index afdd84c..155153f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
@@ -19,7 +19,8 @@
package org.apache.flink.table.descriptors
import org.apache.flink.table.api.{BatchTableEnvironment, Table, TableException, ValidationException}
-import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceFactoryService}
+import org.apache.flink.table.connectors.{TableFactoryService, TableSourceFactory}
+import org.apache.flink.table.sources.{BatchTableSource, TableSource}
class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: ConnectorDescriptor)
extends TableSourceDescriptor {
@@ -43,7 +44,11 @@ class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: Con
* Searches for the specified table source, configures it accordingly, and returns it.
*/
def toTableSource: TableSource[_] = {
- val source = TableSourceFactoryService.findAndCreateTableSource(this)
+ val properties = new DescriptorProperties()
+ addProperties(properties)
+ val source = TableFactoryService.find(classOf[TableSourceFactory[_]], this)
+ .asInstanceOf[TableSourceFactory[_]]
+ .createTableSource(properties.asMap)
source match {
case _: BatchTableSource[_] => source
case _ => throw new TableException(