You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/13 04:14:45 UTC

[flink] branch release-1.11 updated: [FLINK-18867][hive] Generic table stored in Hive catalog is incompatible for 1.10

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 38cb920  [FLINK-18867][hive] Generic table stored in Hive catalog is incompatible for 1.10
38cb920 is described below

commit 38cb9201ff6319b4fa1f8aa4721219fb35c26bf3
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Aug 13 12:11:01 2020 +0800

    [FLINK-18867][hive] Generic table stored in Hive catalog is incompatible for 1.10
    
    This closes #13101
---
 .../flink/table/catalog/hive/HiveCatalog.java      |   7 +-
 .../hive/HiveCatalogGenericMetadataTest.java       | 145 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index d10127f..a994c5f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -147,7 +147,8 @@ public class HiveCatalog extends AbstractCatalog {
 	private final String hiveVersion;
 	private final HiveShim hiveShim;
 
-	private HiveMetastoreClientWrapper client;
+	@VisibleForTesting
+	HiveMetastoreClientWrapper client;
 
 	public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir) {
 		this(catalogName, defaultDatabase, hiveConfDir, HiveShimLoader.getHiveVersion());
@@ -614,8 +615,10 @@ public class HiveCatalog extends AbstractCatalog {
 			DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
 			tableSchemaProps.putProperties(properties);
 			ObjectPath tablePath = new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName());
+			// try to get table schema with both new and old (1.10) key, in order to support tables created in old version
 			tableSchema = tableSchemaProps.getOptionalTableSchema(Schema.SCHEMA)
-					.orElseThrow(() -> new CatalogException("Failed to get table schema from properties for generic table " + tablePath));
+					.orElseGet(() -> tableSchemaProps.getOptionalTableSchema("generic.table.schema")
+							.orElseThrow(() -> new CatalogException("Failed to get table schema from properties for generic table " + tablePath)));
 			partitionKeys = tableSchemaProps.getPartitionKeys();
 			// remove the schema from properties
 			properties = CatalogTableImpl.removeRedundant(properties, tableSchema, partitionKeys);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index bb83d94..ec389a5 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.types.DataType;
 
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test for HiveCatalog on generic metadata.
@@ -65,6 +69,147 @@ public class HiveCatalogGenericMetadataTest extends HiveCatalogMetadataTestBase
 		}
 	}
 
+	@Test
+	// NOTE: Be careful to modify this test, it is important to backward compatibility
+	public void testTableSchemaCompatibility() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		try {
+			// table with numeric types
+			ObjectPath tablePath = new ObjectPath(db1, "generic1");
+			Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+					tablePath.getObjectName());
+			hiveTable.setDbName(tablePath.getDatabaseName());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().putAll(getBatchTableProperties());
+			hiveTable.getParameters().put("flink.generic.table.schema.0.name", "ti");
+			hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "TINYINT");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.name", "si");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "SMALLINT");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.name", "i");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "INT");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.name", "bi");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "BIGINT");
+			hiveTable.getParameters().put("flink.generic.table.schema.4.name", "f");
+			hiveTable.getParameters().put("flink.generic.table.schema.4.data-type", "FLOAT");
+			hiveTable.getParameters().put("flink.generic.table.schema.5.name", "d");
+			hiveTable.getParameters().put("flink.generic.table.schema.5.data-type", "DOUBLE");
+			hiveTable.getParameters().put("flink.generic.table.schema.6.name", "de");
+			hiveTable.getParameters().put("flink.generic.table.schema.6.data-type", "DECIMAL(10, 5)");
+			hiveTable.getParameters().put("flink.generic.table.schema.7.name", "cost");
+			hiveTable.getParameters().put("flink.generic.table.schema.7.expr", "`d` * `bi`");
+			hiveTable.getParameters().put("flink.generic.table.schema.7.data-type", "DOUBLE");
+			((HiveCatalog) catalog).client.createTable(hiveTable);
+			CatalogBaseTable catalogBaseTable = catalog.getTable(tablePath);
+			assertTrue(Boolean.parseBoolean(catalogBaseTable.getOptions().get(CatalogConfig.IS_GENERIC)));
+			TableSchema expectedSchema = TableSchema.builder()
+					.fields(new String[]{"ti", "si", "i", "bi", "f", "d", "de"},
+							new DataType[]{DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.BIGINT(),
+									DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DECIMAL(10, 5)})
+					.field("cost", DataTypes.DOUBLE(), "`d` * `bi`")
+					.build();
+			assertEquals(expectedSchema, catalogBaseTable.getSchema());
+
+			// table with character types
+			tablePath = new ObjectPath(db1, "generic2");
+			hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+					tablePath.getObjectName());
+			hiveTable.setDbName(tablePath.getDatabaseName());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().putAll(getBatchTableProperties());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().put("flink.generic.table.schema.0.name", "c");
+			hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "CHAR(265)");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.name", "vc");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "VARCHAR(65536)");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.name", "s");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "VARCHAR(2147483647)");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.name", "b");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "BINARY(1)");
+			hiveTable.getParameters().put("flink.generic.table.schema.4.name", "vb");
+			hiveTable.getParameters().put("flink.generic.table.schema.4.data-type", "VARBINARY(255)");
+			hiveTable.getParameters().put("flink.generic.table.schema.5.name", "bs");
+			hiveTable.getParameters().put("flink.generic.table.schema.5.data-type", "VARBINARY(2147483647)");
+			hiveTable.getParameters().put("flink.generic.table.schema.6.name", "len");
+			hiveTable.getParameters().put("flink.generic.table.schema.6.expr", "CHAR_LENGTH(`s`)");
+			hiveTable.getParameters().put("flink.generic.table.schema.6.data-type", "INT");
+			((HiveCatalog) catalog).client.createTable(hiveTable);
+			catalogBaseTable = catalog.getTable(tablePath);
+			expectedSchema = TableSchema.builder()
+					.fields(new String[]{"c", "vc", "s", "b", "vb", "bs"},
+							new DataType[]{DataTypes.CHAR(265), DataTypes.VARCHAR(65536), DataTypes.STRING(), DataTypes.BINARY(1),
+									DataTypes.VARBINARY(255), DataTypes.BYTES()})
+					.field("len", DataTypes.INT(), "CHAR_LENGTH(`s`)")
+					.build();
+			assertEquals(expectedSchema, catalogBaseTable.getSchema());
+
+			// table with date/time types
+			tablePath = new ObjectPath(db1, "generic3");
+			hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+					tablePath.getObjectName());
+			hiveTable.setDbName(tablePath.getDatabaseName());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().putAll(getBatchTableProperties());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().put("flink.generic.table.schema.0.name", "dt");
+			hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "DATE");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.name", "t");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "TIME(0)");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.name", "ts");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "TIMESTAMP(3)");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.name", "tstz");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "TIMESTAMP(6) WITH LOCAL TIME ZONE");
+			hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.rowtime", "ts");
+			hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
+			hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.expr", "ts");
+			((HiveCatalog) catalog).client.createTable(hiveTable);
+			catalogBaseTable = catalog.getTable(tablePath);
+			expectedSchema = TableSchema.builder()
+					.fields(new String[]{"dt", "t", "ts", "tstz"},
+							new DataType[]{DataTypes.DATE(), DataTypes.TIME(), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()})
+					.watermark("ts", "ts", DataTypes.TIMESTAMP(3))
+					.build();
+			assertEquals(expectedSchema, catalogBaseTable.getSchema());
+
+			// table with complex/misc types
+			tablePath = new ObjectPath(db1, "generic4");
+			hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+					tablePath.getObjectName());
+			hiveTable.setDbName(tablePath.getDatabaseName());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().putAll(getBatchTableProperties());
+			hiveTable.setTableName(tablePath.getObjectName());
+			hiveTable.getParameters().put("flink.generic.table.schema.0.name", "a");
+			hiveTable.getParameters().put("flink.generic.table.schema.0.data-type", "ARRAY<INT>");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.name", "m");
+			hiveTable.getParameters().put("flink.generic.table.schema.1.data-type", "MAP<BIGINT, TIMESTAMP(6)>");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.name", "mul");
+			hiveTable.getParameters().put("flink.generic.table.schema.2.data-type", "MULTISET<DOUBLE>");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.name", "r");
+			hiveTable.getParameters().put("flink.generic.table.schema.3.data-type", "ROW<`f1` INT, `f2` VARCHAR(2147483647)>");
+			hiveTable.getParameters().put("flink.generic.table.schema.4.name", "b");
+			hiveTable.getParameters().put("flink.generic.table.schema.4.data-type", "BOOLEAN");
+			hiveTable.getParameters().put("flink.generic.table.schema.5.name", "ts");
+			hiveTable.getParameters().put("flink.generic.table.schema.5.data-type", "TIMESTAMP(3)");
+			hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.rowtime", "ts");
+			hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
+			hiveTable.getParameters().put("flink.generic.table.schema.watermark.0.strategy.expr", "`ts` - INTERVAL '5' SECOND");
+			((HiveCatalog) catalog).client.createTable(hiveTable);
+			catalogBaseTable = catalog.getTable(tablePath);
+			expectedSchema = TableSchema.builder()
+					.fields(new String[]{"a", "m", "mul", "r", "b", "ts"},
+							new DataType[]{DataTypes.ARRAY(DataTypes.INT()),
+									DataTypes.MAP(DataTypes.BIGINT(), DataTypes.TIMESTAMP()),
+									DataTypes.MULTISET(DataTypes.DOUBLE()),
+									DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING())),
+									DataTypes.BOOLEAN(), DataTypes.TIMESTAMP(3)})
+					.watermark("ts", "`ts` - INTERVAL '5' SECOND", DataTypes.TIMESTAMP(3))
+					.build();
+			assertEquals(expectedSchema, catalogBaseTable.getSchema());
+		} finally {
+			catalog.dropDatabase(db1, true, true);
+		}
+	}
+
 	// ------ partitions ------
 
 	@Test