You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/22 15:03:52 UTC
[flink] branch master updated: [FLINK-13313][table] create
CatalogTableBuilder to support building CatalogTable from descriptors
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b45becf [FLINK-13313][table] create CatalogTableBuilder to support building CatalogTable from descriptors
b45becf is described below
commit b45becf8fc1d656874d9ffd4ad17b1276cf4b43f
Author: bowen.li <bo...@gmail.com>
AuthorDate: Thu Jul 18 20:52:52 2019 -0700
[FLINK-13313][table] create CatalogTableBuilder to support building CatalogTable from descriptors
This PR adds CatalogTableBuilder as a replacement of ExternalCatalogTableBuilder to help users convert table source/sink descriptors to CatalogTable. The gap was mainly discovered when I was writing tests for HiveCatalog to make sure it works as expected to persist Flink generic tables
This closes #9172.
---
flink-connectors/flink-connector-hive/pom.xml | 8 +
.../table/catalog/hive/HiveCatalogITCase.java | 170 ++++++++++++++++++++
.../src/test/resources/csv/test.csv | 3 +
.../flink/table/catalog/CatalogTableBuilder.java | 173 +++++++++++++++++++++
.../flink/table/catalog/config/CatalogConfig.java | 3 +
...criptor.java => ConnectorFormatDescriptor.java} | 9 +-
.../table/descriptors/SchematicDescriptor.java | 6 +-
.../catalog/ExternalCatalogTableBuilder.scala | 3 +
8 files changed, 365 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 02e15c9..5397312 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -455,6 +455,14 @@ under the License.
</exclusions>
</dependency>
+ <!-- TODO: move to flink-connector-hive-test end-to-end test module once it's setup -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
new file mode 100644
index 0000000..6f437df
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableBuilder;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.FileSystem;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.OldCsv;
+import org.apache.flink.types.Row;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * IT case for HiveCatalog.
+ * TODO: move to flink-connector-hive-test end-to-end test module once it's setup
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class HiveCatalogITCase {
+
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static HiveCatalog hiveCatalog;
+ private static HiveConf hiveConf;
+
+ private String sourceTableName = "csv_source";
+ private String sinkTableName = "csv_sink";
+
+ @BeforeClass
+ public static void createCatalog() {
+ hiveConf = hiveShell.getHiveConf();
+ hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+ hiveCatalog.open();
+ }
+
+ @AfterClass
+ public static void closeCatalog() {
+ if (hiveCatalog != null) {
+ hiveCatalog.close();
+ }
+ }
+
+ @Test
+ public void testGenericTable() throws Exception {
+ ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+ BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+
+ tableEnv.registerCatalog("myhive", hiveCatalog);
+
+ TableSchema schema = TableSchema.builder()
+ .field("name", DataTypes.STRING())
+ .field("age", DataTypes.INT())
+ .build();
+
+ FormatDescriptor format = new OldCsv()
+ .field("name", Types.STRING())
+ .field("age", Types.INT());
+
+ CatalogTable source =
+ new CatalogTableBuilder(
+ new FileSystem().path(this.getClass().getResource("/csv/test.csv").getPath()),
+ schema)
+ .withFormat(format)
+ .inAppendMode()
+ .withComment(null)
+ .build();
+
+ Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
+
+ CatalogTable sink =
+ new CatalogTableBuilder(
+ new FileSystem().path(p.toAbsolutePath().toString()),
+ schema)
+ .withFormat(format)
+ .inAppendMode()
+ .withComment(null)
+ .build();
+
+ hiveCatalog.createTable(
+ new ObjectPath(HiveCatalog.DEFAULT_DB, sourceTableName),
+ source,
+ false
+ );
+
+ hiveCatalog.createTable(
+ new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
+ sink,
+ false
+ );
+
+ Table t = tableEnv.sqlQuery(
+ String.format("select * from myhive.`default`.%s", sourceTableName));
+
+ List<Row> result = tableEnv.toDataSet(t, Row.class).collect();
+
+ // assert query result
+ assertEquals(
+ Arrays.asList(
+ Row.of("1", 1),
+ Row.of("2", 2),
+ Row.of("3", 3)),
+ result
+ );
+
+ tableEnv.sqlUpdate(
+ String.format("insert into myhive.`default`.%s select * from myhive.`default`.%s",
+ sinkTableName,
+ sourceTableName));
+ tableEnv.execute("myjob");
+
+ // assert written result
+ File resultFile = new File(p.toAbsolutePath().toString());
+ BufferedReader reader = new BufferedReader(new FileReader(resultFile));
+ String readLine;
+ for (int i = 0; i < 3; i++) {
+ readLine = reader.readLine();
+ assertEquals(String.format("%d,%d", i + 1, i + 1), readLine);
+ }
+
+ // No more line
+ assertNull(reader.readLine());
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv b/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv
new file mode 100644
index 0000000..9b5b3ac
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv
@@ -0,0 +1,3 @@
+1,1
+2,2
+3,3
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
new file mode 100644
index 0000000..01f46bd
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.ConnectorFormatDescriptor;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.Metadata;
+import org.apache.flink.table.descriptors.Statistics;
+import org.apache.flink.table.descriptors.StreamableDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder for creating a {@link CatalogTable}.
+ *
+ * <p>It takes {@link Descriptor}s which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table
+ * factories that match the desired configuration.
+ *
+ * <p>Use the provided builder methods to configure the catalog table accordingly.
+ *
+ * <p>The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
+ *
+ * <code>
+ * CatalogTable table = new CatalogTableBuilder(
+ * new ExternalSystemXYZ()
+ * .version("0.11"),
+ * new TableSchema.Builder()
+ * .fields(names, dataTypes)
+ * .build())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withComment("test comment")
+ * .build()
+ * </code>
+ */
+@PublicEvolving
+public class CatalogTableBuilder
+ extends TableDescriptor
+ implements ConnectorFormatDescriptor<CatalogTableBuilder>, StreamableDescriptor<CatalogTableBuilder> {
+
+ private final ConnectorDescriptor connectorDescriptor;
+ private final TableSchema tableSchema;
+ private final boolean isGeneric;
+
+ private String comment;
+
+ private Optional<FormatDescriptor> formatDescriptor = Optional.empty();
+ private Optional<Statistics> statisticsDescriptor = Optional.empty();
+ private Optional<Metadata> metadataDescriptor = Optional.empty();
+ private Optional<String> updateMode = Optional.empty();
+ private Map<String, String> properties = Collections.emptyMap();
+
+ /**
+ *
+ * @param connectorDescriptor descriptor of the connector
+ * @param tableSchema schema of the table
+ */
+ public CatalogTableBuilder(ConnectorDescriptor connectorDescriptor, TableSchema tableSchema) {
+ this.connectorDescriptor = checkNotNull(connectorDescriptor);
+ this.tableSchema = checkNotNull(tableSchema);
+
+ // We don't support non generic table currently
+ this.isGeneric = true;
+ }
+
+ @Override
+ public CatalogTableBuilder withFormat(FormatDescriptor format) {
+ this.formatDescriptor = Optional.of(checkNotNull(format));
+ return this;
+ }
+
+ @Override
+ public CatalogTableBuilder inAppendMode() {
+ updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND);
+ return this;
+ }
+
+ @Override
+ public CatalogTableBuilder inRetractMode() {
+ updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT);
+ return this;
+ }
+
+ @Override
+ public CatalogTableBuilder inUpsertMode() {
+ updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT);
+ return this;
+ }
+
+ public CatalogTableBuilder withComment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public CatalogTableBuilder withProperties(Map<String, String> properties) {
+ this.properties = checkNotNull(properties);
+ return this;
+ }
+
+ /**
+ * Build a {@link CatalogTable}.
+ *
+ * @return catalog table
+ */
+ public CatalogTable build() {
+ return new CatalogTableImpl(
+ tableSchema,
+ toProperties(),
+ comment);
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ DescriptorProperties descriptorProperties = new DescriptorProperties();
+ descriptorProperties.putProperties(connectorDescriptor.toProperties());
+
+ if (formatDescriptor.isPresent()) {
+ descriptorProperties.putProperties(formatDescriptor.get().toProperties());
+ }
+
+ if (statisticsDescriptor.isPresent()) {
+ descriptorProperties.putProperties(statisticsDescriptor.get().toProperties());
+ }
+
+ if (metadataDescriptor.isPresent()) {
+ descriptorProperties.putProperties(metadataDescriptor.get().toProperties());
+ }
+
+ if (updateMode.isPresent()) {
+ descriptorProperties.putString(UPDATE_MODE, updateMode.get());
+ }
+
+ descriptorProperties.putProperties(this.properties);
+ descriptorProperties.putString(CatalogConfig.IS_GENERIC, String.valueOf(isGeneric));
+
+ return descriptorProperties.asMap();
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 7a4a624..9f23a7f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -23,6 +23,9 @@ package org.apache.flink.table.catalog.config;
*/
public class CatalogConfig {
+ /**
+ * Flag to distinguish if a meta-object is generic Flink object or not.
+ */
public static final String IS_GENERIC = "is_generic";
// Globally reserved prefix for catalog properties.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
similarity index 80%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
index 2f9a5db..7397d39 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
@@ -21,18 +21,13 @@ package org.apache.flink.table.descriptors;
import org.apache.flink.annotation.PublicEvolving;
/**
- * An interface for descriptors that allow to define a format and schema.
+ * An interface for descriptors that allow to define a format.
*/
@PublicEvolving
-public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends Descriptor {
+public interface ConnectorFormatDescriptor<D extends ConnectorFormatDescriptor<D>> extends Descriptor {
/**
* Specifies the format that defines how to read data from a connector.
*/
D withFormat(FormatDescriptor format);
-
- /**
- * Specifies the resulting table schema.
- */
- D withSchema(Schema schema);
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
index 2f9a5db..121e49c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
@@ -18,12 +18,12 @@
package org.apache.flink.table.descriptors;
-import org.apache.flink.annotation.PublicEvolving;
-
/**
* An interface for descriptors that allow to define a format and schema.
+ *
+ * @deprecated use {@link ConnectorFormatDescriptor}.
*/
-@PublicEvolving
+@Deprecated
public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends Descriptor {
/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
index ae5c677..52bfa06 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
@@ -52,7 +52,10 @@ import org.apache.flink.table.descriptors._
* }}}
*
* @param connectorDescriptor Connector descriptor describing the external system
+ *
+ * @deprecated use [[CatalogTableBuilder]]
*/
+@Deprecated
@deprecated
class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDescriptor)
extends TableDescriptor