You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/01 18:33:44 UTC

[flink] branch master updated: [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b68ca7  [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table
8b68ca7 is described below

commit 8b68ca769967f6c2035ceba6ebc25fe6b9988250
Author: bowen.li <bo...@gmail.com>
AuthorDate: Thu Jun 27 10:43:31 2019 -0700

    [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table
    
    This PR fixes a bug that 'flink.is_generic' key should not add for metadata for a non-generic catalog table (a.k.a a Hive table).
    
    This closes #8904.
---
 .../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++---------
 .../flink/table/catalog/config/CatalogConfig.java    |  6 ++++++
 .../apache/flink/table/catalog/CatalogTestUtil.java  |  3 +++
 3 files changed, 20 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 237b105..8659a80 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.batch.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.AbstractCatalog;
@@ -93,6 +94,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -107,11 +109,6 @@ public class HiveCatalog extends AbstractCatalog {
 	private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
 	private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile";
 
-	// Prefix used to distinguish properties created by Hive and Flink,
-	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
-	private static final String FLINK_PROPERTY_PREFIX = "flink.";
-	private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + CatalogConfig.IS_GENERIC;
-
 	// Prefix used to distinguish Flink functions from Hive functions.
 	// It's appended to Flink function's class name
 	// because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
@@ -482,7 +479,7 @@ public class HiveCatalog extends AbstractCatalog {
 		// Table properties
 		Map<String, String> properties = hiveTable.getParameters();
 
-		boolean isGeneric = Boolean.valueOf(properties.computeIfAbsent(FLINK_PROPERTY_IS_GENERIC, k -> String.valueOf(false)));
+		boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
 		if (isGeneric) {
 			properties = retrieveFlinkProperties(properties);
 		}
@@ -580,20 +577,25 @@ public class HiveCatalog extends AbstractCatalog {
 
 	/**
 	 * Filter out Hive-created properties, and return Flink-created properties.
+	 * Note that 'is_generic' is a special key and this method will leave it as-is.
 	 */
 	private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
 		return hiveTableParams.entrySet().stream()
-			.filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+			.filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX) || e.getKey().equals(CatalogConfig.IS_GENERIC))
 			.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
 	}
 
 	/**
 	 * Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+	 * Note that 'is_generic' is a special key and this method will leave it as-is.
 	 */
 	private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
 		return properties.entrySet().stream()
 			.filter(e -> e.getKey() != null && e.getValue() != null)
-			.collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
+			.map(e -> new Tuple2<>(
+				e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
+				e.getValue()))
+			.collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 	}
 
 	// ------ partitions ------
@@ -769,7 +771,7 @@ public class HiveCatalog extends AbstractCatalog {
 
 	// make sure both table and partition are generic, or neither is
 	private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) {
-		boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+		boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
 		if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) ||
 			(!isGeneric && catalogPartition instanceof GenericCatalogPartition)) {
 			throw new CatalogException(String.format("Cannot handle %s partition for %s table",
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 05afa32..7a4a624 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -24,4 +24,10 @@ package org.apache.flink.table.catalog.config;
 public class CatalogConfig {
 
 	public static final String IS_GENERIC = "is_generic";
+
+	// Globally reserved prefix for catalog properties.
+	// User defined properties should not with this prefix.
+	// Used to distinguish properties created by Hive and Flink,
+	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+	public static final String FLINK_PROPERTY_PREFIX = "flink.";
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 905a44c..1c64025 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.plan.stats.TableStats;
 
 import java.util.Map;
 
+import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -57,6 +58,7 @@ public class CatalogTestUtil {
 		if (Boolean.valueOf(t1.getProperties().get(CatalogConfig.IS_GENERIC))) {
 			assertEquals(t1.getProperties(), t2.getProperties());
 		} else {
+			assertTrue(t2.getProperties().keySet().stream().noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX)));
 			assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
 		}
 	}
@@ -73,6 +75,7 @@ public class CatalogTestUtil {
 		if (Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) {
 			assertEquals(v1.getProperties(), v2.getProperties());
 		} else {
+			assertTrue(v2.getProperties().keySet().stream().noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX)));
 			assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
 		}
 	}