You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/01 18:33:44 UTC
[flink] branch master updated: [FLINK-13005][hive] HiveCatalog
should not add 'flink.is_generic' key for Hive table
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8b68ca7 [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table
8b68ca7 is described below
commit 8b68ca769967f6c2035ceba6ebc25fe6b9988250
Author: bowen.li <bo...@gmail.com>
AuthorDate: Thu Jun 27 10:43:31 2019 -0700
[FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table
This PR fixes a bug that 'flink.is_generic' key should not add for metadata for a non-generic catalog table (a.k.a a Hive table).
This closes #8904.
---
.../apache/flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++---------
.../flink/table/catalog/config/CatalogConfig.java | 6 ++++++
.../apache/flink/table/catalog/CatalogTestUtil.java | 3 +++
3 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 237b105..8659a80 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.batch.connectors.hive.HiveTableFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
@@ -93,6 +94,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -107,11 +109,6 @@ public class HiveCatalog extends AbstractCatalog {
private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile";
- // Prefix used to distinguish properties created by Hive and Flink,
- // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
- private static final String FLINK_PROPERTY_PREFIX = "flink.";
- private static final String FLINK_PROPERTY_IS_GENERIC = FLINK_PROPERTY_PREFIX + CatalogConfig.IS_GENERIC;
-
// Prefix used to distinguish Flink functions from Hive functions.
// It's appended to Flink function's class name
// because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
@@ -482,7 +479,7 @@ public class HiveCatalog extends AbstractCatalog {
// Table properties
Map<String, String> properties = hiveTable.getParameters();
- boolean isGeneric = Boolean.valueOf(properties.computeIfAbsent(FLINK_PROPERTY_IS_GENERIC, k -> String.valueOf(false)));
+ boolean isGeneric = Boolean.valueOf(properties.get(CatalogConfig.IS_GENERIC));
if (isGeneric) {
properties = retrieveFlinkProperties(properties);
}
@@ -580,20 +577,25 @@ public class HiveCatalog extends AbstractCatalog {
/**
* Filter out Hive-created properties, and return Flink-created properties.
+ * Note that 'is_generic' is a special key and this method will leave it as-is.
*/
private static Map<String, String> retrieveFlinkProperties(Map<String, String> hiveTableParams) {
return hiveTableParams.entrySet().stream()
- .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+ .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX) || e.getKey().equals(CatalogConfig.IS_GENERIC))
.collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
}
/**
* Add a prefix to Flink-created properties to distinguish them from Hive-created properties.
+ * Note that 'is_generic' is a special key and this method will leave it as-is.
*/
private static Map<String, String> maskFlinkProperties(Map<String, String> properties) {
return properties.entrySet().stream()
.filter(e -> e.getKey() != null && e.getValue() != null)
- .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue()));
+ .map(e -> new Tuple2<>(
+ e.getKey().equals(CatalogConfig.IS_GENERIC) ? e.getKey() : FLINK_PROPERTY_PREFIX + e.getKey(),
+ e.getValue()))
+ .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}
// ------ partitions ------
@@ -769,7 +771,7 @@ public class HiveCatalog extends AbstractCatalog {
// make sure both table and partition are generic, or neither is
private static void ensureTableAndPartitionMatch(Table hiveTable, CatalogPartition catalogPartition) {
- boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+ boolean isGeneric = Boolean.valueOf(hiveTable.getParameters().get(CatalogConfig.IS_GENERIC));
if ((isGeneric && catalogPartition instanceof HiveCatalogPartition) ||
(!isGeneric && catalogPartition instanceof GenericCatalogPartition)) {
throw new CatalogException(String.format("Cannot handle %s partition for %s table",
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
index 05afa32..7a4a624 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/config/CatalogConfig.java
@@ -24,4 +24,10 @@ package org.apache.flink.table.catalog.config;
public class CatalogConfig {
public static final String IS_GENERIC = "is_generic";
+
+ // Globally reserved prefix for catalog properties.
+ // User defined properties should not with this prefix.
+ // Used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
+ public static final String FLINK_PROPERTY_PREFIX = "flink.";
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 905a44c..1c64025 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.plan.stats.TableStats;
import java.util.Map;
+import static org.apache.flink.table.catalog.config.CatalogConfig.FLINK_PROPERTY_PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -57,6 +58,7 @@ public class CatalogTestUtil {
if (Boolean.valueOf(t1.getProperties().get(CatalogConfig.IS_GENERIC))) {
assertEquals(t1.getProperties(), t2.getProperties());
} else {
+ assertTrue(t2.getProperties().keySet().stream().noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX)));
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
}
}
@@ -73,6 +75,7 @@ public class CatalogTestUtil {
if (Boolean.valueOf(v1.getProperties().get(CatalogConfig.IS_GENERIC))) {
assertEquals(v1.getProperties(), v2.getProperties());
} else {
+ assertTrue(v2.getProperties().keySet().stream().noneMatch(k -> k.startsWith(FLINK_PROPERTY_PREFIX)));
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
}
}