You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/22 15:03:52 UTC

[flink] branch master updated: [FLINK-13313][table] create CatalogTableBuilder to support building CatalogTable from descriptors

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b45becf  [FLINK-13313][table] create CatalogTableBuilder to support building CatalogTable from descriptors
b45becf is described below

commit b45becf8fc1d656874d9ffd4ad17b1276cf4b43f
Author: bowen.li <bo...@gmail.com>
AuthorDate: Thu Jul 18 20:52:52 2019 -0700

    [FLINK-13313][table] create CatalogTableBuilder to support building CatalogTable from descriptors
    
    This PR adds CatalogTableBuilder as a replacement of ExternalCatalogTableBuilder to help users convert table source/sink descriptors to CatalogTable. The gap was mainly discovered when I was writing tests for HiveCatalog to make sure it works as expected to persist Flink generic tables
    
    This closes #9172.
---
 flink-connectors/flink-connector-hive/pom.xml      |   8 +
 .../table/catalog/hive/HiveCatalogITCase.java      | 170 ++++++++++++++++++++
 .../src/test/resources/csv/test.csv                |   3 +
 .../flink/table/catalog/CatalogTableBuilder.java   | 173 +++++++++++++++++++++
 .../flink/table/catalog/config/CatalogConfig.java  |   3 +
 ...criptor.java => ConnectorFormatDescriptor.java} |   9 +-
 .../table/descriptors/SchematicDescriptor.java     |   6 +-
 .../catalog/ExternalCatalogTableBuilder.scala      |   3 +
 8 files changed, 365 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 02e15c9..5397312 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -455,6 +455,14 @@ under the License.
 			</exclusions>
         </dependency>
 
+		<!-- TODO: move to flink-connector-hive-test end-to-end test module once it's setup -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
new file mode 100644
index 0000000..6f437df
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.batch.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableBuilder;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.FileSystem;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.OldCsv;
+import org.apache.flink.types.Row;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * IT case for HiveCatalog.
+ * TODO: move to flink-connector-hive-test end-to-end test module once it's setup
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class HiveCatalogITCase {
+
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	private static HiveCatalog hiveCatalog;
+	private static HiveConf hiveConf;
+
+	private String sourceTableName = "csv_source";
+	private String sinkTableName = "csv_sink";
+
+	@BeforeClass
+	public static void createCatalog() {
+		hiveConf = hiveShell.getHiveConf();
+		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+		hiveCatalog.open();
+	}
+
+	@AfterClass
+	public static void closeCatalog() {
+		if (hiveCatalog != null) {
+			hiveCatalog.close();
+		}
+	}
+
+	@Test
+	public void testGenericTable() throws Exception {
+		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+
+		tableEnv.registerCatalog("myhive", hiveCatalog);
+
+		TableSchema schema = TableSchema.builder()
+			.field("name", DataTypes.STRING())
+			.field("age", DataTypes.INT())
+			.build();
+
+		FormatDescriptor format = new OldCsv()
+			.field("name", Types.STRING())
+			.field("age", Types.INT());
+
+		CatalogTable source =
+			new CatalogTableBuilder(
+				new FileSystem().path(this.getClass().getResource("/csv/test.csv").getPath()),
+				schema)
+			.withFormat(format)
+			.inAppendMode()
+			.withComment(null)
+			.build();
+
+		Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
+
+		CatalogTable sink =
+			new CatalogTableBuilder(
+				new FileSystem().path(p.toAbsolutePath().toString()),
+				schema)
+				.withFormat(format)
+				.inAppendMode()
+				.withComment(null)
+				.build();
+
+		hiveCatalog.createTable(
+			new ObjectPath(HiveCatalog.DEFAULT_DB, sourceTableName),
+			source,
+			false
+		);
+
+		hiveCatalog.createTable(
+			new ObjectPath(HiveCatalog.DEFAULT_DB, sinkTableName),
+			sink,
+			false
+		);
+
+		Table t = tableEnv.sqlQuery(
+			String.format("select * from myhive.`default`.%s", sourceTableName));
+
+		List<Row> result = tableEnv.toDataSet(t, Row.class).collect();
+
+		// assert query result
+		assertEquals(
+			Arrays.asList(
+				Row.of("1", 1),
+				Row.of("2", 2),
+				Row.of("3", 3)),
+			result
+		);
+
+		tableEnv.sqlUpdate(
+			String.format("insert into myhive.`default`.%s select * from myhive.`default`.%s",
+				sinkTableName,
+				sourceTableName));
+		tableEnv.execute("myjob");
+
+		// assert written result
+		File resultFile = new File(p.toAbsolutePath().toString());
+		BufferedReader reader = new BufferedReader(new FileReader(resultFile));
+		String readLine;
+		for (int i = 0; i < 3; i++) {
+			readLine = reader.readLine();
+			assertEquals(String.format("%d,%d", i + 1, i + 1), readLine);
+		}
+
+		// No more line
+		assertNull(reader.readLine());
+	}
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv b/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv
new file mode 100644
index 0000000..9b5b3ac
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/csv/test.csv
@@ -0,0 +1,3 @@
+1,1
+2,2
+3,3
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
new file mode 100644
index 0000000..01f46bd
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.ConnectorFormatDescriptor;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.Metadata;
+import org.apache.flink.table.descriptors.Statistics;
+import org.apache.flink.table.descriptors.StreamableDescriptor;
+import org.apache.flink.table.descriptors.TableDescriptor;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
+import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A builder for creating a {@link CatalogTable}.
+ *
+ * <p>It takes {@link Descriptor}s which allow for declaring the communication to external
+ * systems in an implementation-agnostic way. The classpath is scanned for suitable table
+ * factories that match the desired configuration.
+ *
+ * <p>Use the provided builder methods to configure the catalog table accordingly.
+ *
+ * <p>The following example shows how to read from a connector using a JSON format and
+ * declaring it as a table source:
+ *
+ * <code>
+ * CatalogTable table = new CatalogTableBuilder(
+ *       new ExternalSystemXYZ()
+ *         .version("0.11"),
+ *       new TableSchema.Builder()
+ *     	   .fields(names, dataTypes)
+ *     	   .build())
+ *   .withFormat(
+ *     new Json()
+ *       .jsonSchema("{...}")
+ *       .failOnMissingField(false))
+ *   .withComment("test comment")
+ *   .build()
+ * </code>
+ */
+@PublicEvolving
+public class CatalogTableBuilder
+		extends TableDescriptor
+		implements ConnectorFormatDescriptor<CatalogTableBuilder>, StreamableDescriptor<CatalogTableBuilder> {
+
+	private final ConnectorDescriptor connectorDescriptor;
+	private final TableSchema tableSchema;
+	private final boolean isGeneric;
+
+	private String comment;
+
+	private Optional<FormatDescriptor> formatDescriptor = Optional.empty();
+	private Optional<Statistics> statisticsDescriptor = Optional.empty();
+	private Optional<Metadata> metadataDescriptor = Optional.empty();
+	private Optional<String> updateMode = Optional.empty();
+	private Map<String, String> properties = Collections.emptyMap();
+
+	/**
+	 *
+	 * @param connectorDescriptor descriptor of the connector
+	 * @param tableSchema schema of the table
+	 */
+	public CatalogTableBuilder(ConnectorDescriptor connectorDescriptor, TableSchema tableSchema) {
+		this.connectorDescriptor = checkNotNull(connectorDescriptor);
+		this.tableSchema = checkNotNull(tableSchema);
+
+		// We don't support non generic table currently
+		this.isGeneric = true;
+	}
+
+	@Override
+	public CatalogTableBuilder withFormat(FormatDescriptor format) {
+		this.formatDescriptor = Optional.of(checkNotNull(format));
+		return this;
+	}
+
+	@Override
+	public CatalogTableBuilder inAppendMode() {
+		updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND);
+		return this;
+	}
+
+	@Override
+	public CatalogTableBuilder inRetractMode() {
+		updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT);
+		return this;
+	}
+
+	@Override
+	public CatalogTableBuilder inUpsertMode() {
+		updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT);
+		return this;
+	}
+
+	public CatalogTableBuilder withComment(String comment) {
+		this.comment = comment;
+		return this;
+	}
+
+	public CatalogTableBuilder withProperties(Map<String, String> properties) {
+		this.properties = checkNotNull(properties);
+		return this;
+	}
+
+	/**
+	 * Build a {@link CatalogTable}.
+	 *
+	 * @return catalog table
+	 */
+	public CatalogTable build() {
+		return new CatalogTableImpl(
+			tableSchema,
+			toProperties(),
+			comment);
+	}
+
+	@Override
+	public Map<String, String> toProperties() {
+		DescriptorProperties descriptorProperties = new DescriptorProperties();
+		descriptorProperties.putProperties(connectorDescriptor.toProperties());
+
+		if (formatDescriptor.isPresent()) {
+			descriptorProperties.putProperties(formatDescriptor.get().toProperties());
+		}
+
+		if (statisticsDescriptor.isPresent()) {
+			descriptorProperties.putProperties(statisticsDescriptor.get().toProperties());
+		}
+
+		if (metadataDescriptor.isPresent()) {
+			descriptorProperties.putProperties(metadataDescriptor.get().toProperties());
+		}
+
+		if (updateMode.isPresent()) {
+			descriptorProperties.putString(UPDATE_MODE, updateMode.get());
+		}
+
+		descriptorProperties.putProperties(this.properties);
+		descriptorProperties.putString(CatalogConfig.IS_GENERIC, String.valueOf(isGeneric));
+
+		return descriptorProperties.asMap();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 7a4a624..9f23a7f 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -23,6 +23,9 @@ package org.apache.flink.table.catalog.config;
  */
 public class CatalogConfig {
 
+	/**
+	 * Flag to distinguish if a meta-object is generic Flink object or not.
+	 */
 	public static final String IS_GENERIC = "is_generic";
 
 	// Globally reserved prefix for catalog properties.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
similarity index 80%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
copy to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
index 2f9a5db..7397d39 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
@@ -21,18 +21,13 @@ package org.apache.flink.table.descriptors;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * An interface for descriptors that allow to define a format and schema.
+ * An interface for descriptors that allow to define a format.
  */
 @PublicEvolving
-public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends Descriptor {
+public interface ConnectorFormatDescriptor<D extends ConnectorFormatDescriptor<D>> extends Descriptor {
 
 	/**
 	 * Specifies the format that defines how to read data from a connector.
 	 */
 	D withFormat(FormatDescriptor format);
-
-	/**
-	 * Specifies the resulting table schema.
-	 */
-	D withSchema(Schema schema);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
index 2f9a5db..121e49c 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.table.descriptors;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 /**
  * An interface for descriptors that allow to define a format and schema.
+ *
+ * @deprecated use {@link ConnectorFormatDescriptor}.
  */
-@PublicEvolving
+@Deprecated
 public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends Descriptor {
 
 	/**
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
index ae5c677..52bfa06 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
@@ -52,7 +52,10 @@ import org.apache.flink.table.descriptors._
   * }}}
   *
   * @param connectorDescriptor Connector descriptor describing the external system
+  *
+  * @deprecated use [[CatalogTableBuilder]]
   */
+@Deprecated
 @deprecated
 class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDescriptor)
   extends TableDescriptor