You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/01 23:26:19 UTC

[flink] branch master updated: [FLINK-13424][hive] HiveCatalog should add hive version in conf

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b20ac47  [FLINK-13424][hive] HiveCatalog should add hive version in conf
b20ac47 is described below

commit b20ac47258228aed51be0d0e6fc958ac303ce4a1
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 26 11:16:49 2019 +0800

    [FLINK-13424][hive] HiveCatalog should add hive version in conf
    
    To avoid overriding the hive version users specify in the yaml file.
    
    This closes #9232.
---
 .../java/org/apache/flink/connectors/hive/HiveTableFactory.java     | 5 +++--
 .../org/apache/flink/connectors/hive/HiveTableOutputFormat.java     | 3 ++-
 .../main/java/org/apache/flink/connectors/hive/HiveTableSink.java   | 4 ++--
 .../main/java/org/apache/flink/connectors/hive/HiveTableSource.java | 4 ++--
 .../main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java  | 3 +++
 .../flink/table/catalog/hive/client/HiveMetastoreClientFactory.java | 6 +++---
 .../java/org/apache/flink/connectors/hive/HiveInputFormatTest.java  | 5 +++--
 .../org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java | 3 ++-
 8 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 4127bf3..73626d0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -76,7 +75,9 @@ public class HiveTableFactory
 	public HiveTableFactory(HiveConf hiveConf) {
 		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
 
-		this.hiveVersion = new JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+		// this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file
+		this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+				"Hive version is not defined");
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 52436cc..e4caac1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -163,7 +163,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		this.overwrite = overwrite;
 		isPartitioned = partitionColumns != null && !partitionColumns.isEmpty();
 		isDynamicPartition = isPartitioned && partitionColumns.size() > hiveTablePartition.getPartitionSpec().size();
-		hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+		hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+				"Hive version is not defined");
 	}
 
 	//  Custom serialization methods
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 7d8a85a..a36479d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
@@ -76,7 +75,8 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 		this.jobConf = jobConf;
 		this.tablePath = tablePath;
 		this.catalogTable = table;
-		hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+		hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+				"Hive version is not defined");
 		TableSchema tableSchema = table.getSchema();
 		rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
 	}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 2df10c5..a5a670e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.PartitionableTableSource;
@@ -72,7 +71,8 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 		this.jobConf = Preconditions.checkNotNull(jobConf);
 		this.tablePath = Preconditions.checkNotNull(tablePath);
 		this.catalogTable = Preconditions.checkNotNull(catalogTable);
-		this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+		this.hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+				"Hive version is not defined");
 		initAllPartitions = false;
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7036196..ba0921c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -135,6 +136,8 @@ public class HiveCatalog extends AbstractCatalog {
 		this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf;
 		checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null or empty");
 		this.hiveVersion = hiveVersion;
+		// add this to hiveConf to make sure table factory and source/sink see the same Hive version as HiveCatalog
+		this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, hiveVersion);
 
 		LOG.info("Created HiveCatalog '{}'", catalogName);
 	}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
index ccdca1b..dc4c6da 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.client;
 
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -31,7 +31,7 @@ public class HiveMetastoreClientFactory {
 	}
 
 	public static HiveMetastoreClientWrapper create(HiveConf hiveConf, String hiveVersion) {
-		return new HiveMetastoreClientWrapper(hiveConf,
-				StringUtils.isNullOrWhitespaceOnly(hiveVersion) ? HiveShimLoader.getHiveVersion() : hiveVersion);
+		Preconditions.checkNotNull(hiveVersion, "Hive version cannot be null");
+		return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
 	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
index 19d6da2..b9a1527 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
@@ -92,7 +93,7 @@ public class HiveInputFormatTest {
 		);
 		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
 		//serDe temporarily.
-		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
+		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
 		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
 		tbl.setDbName(dbName);
 		tbl.setTableName(tblName);
@@ -142,7 +143,7 @@ public class HiveInputFormatTest {
 
 		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
 		//serDe temporarily.
-		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
+		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
 		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
 		tbl.setDbName(dbName);
 		tbl.setTableName(tblName);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index aa02eb6..4ac76cd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
@@ -56,7 +57,7 @@ public class TableEnvHiveConnectorTest {
 		HiveConf hiveConf = hiveShell.getHiveConf();
 		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
 		hiveCatalog.open();
-		hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+		hmsClient = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
 	}
 
 	@Test