You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/01 23:30:57 UTC
[flink] branch release-1.9 updated: [FLINK-13424][hive] HiveCatalog
should add hive version in conf
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 4b8d35c [FLINK-13424][hive] HiveCatalog should add hive version in conf
4b8d35c is described below
commit 4b8d35c7f9eb6573e3b099fd2ca8d4ca42ccae72
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Jul 26 11:16:49 2019 +0800
[FLINK-13424][hive] HiveCatalog should add hive version in conf
To avoid overriding the hive version users specify in the yaml file.
This closes #9232.
---
.../java/org/apache/flink/connectors/hive/HiveTableFactory.java | 5 +++--
.../org/apache/flink/connectors/hive/HiveTableOutputFormat.java | 3 ++-
.../main/java/org/apache/flink/connectors/hive/HiveTableSink.java | 4 ++--
.../main/java/org/apache/flink/connectors/hive/HiveTableSource.java | 4 ++--
.../main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java | 3 +++
.../flink/table/catalog/hive/client/HiveMetastoreClientFactory.java | 6 +++---
.../java/org/apache/flink/connectors/hive/HiveInputFormatTest.java | 5 +++--
.../org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java | 3 ++-
8 files changed, 20 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 4127bf3..73626d0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -76,7 +75,9 @@ public class HiveTableFactory
public HiveTableFactory(HiveConf hiveConf) {
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
- this.hiveVersion = new JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+ // this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file
+ this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 52436cc..e4caac1 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -163,7 +163,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
this.overwrite = overwrite;
isPartitioned = partitionColumns != null && !partitionColumns.isEmpty();
isDynamicPartition = isPartitioned && partitionColumns.size() > hiveTablePartition.getPartitionSpec().size();
- hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+ hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
}
// Custom serialization methods
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 7d8a85a..a36479d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
@@ -76,7 +75,8 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
this.jobConf = jobConf;
this.tablePath = tablePath;
this.catalogTable = table;
- hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+ hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
TableSchema tableSchema = table.getSchema();
rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 2df10c5..a5a670e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
@@ -72,7 +71,8 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
this.jobConf = Preconditions.checkNotNull(jobConf);
this.tablePath = Preconditions.checkNotNull(tablePath);
this.catalogTable = Preconditions.checkNotNull(catalogTable);
- this.hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
+ this.hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
+ "Hive version is not defined");
initAllPartitions = false;
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7036196..ba0921c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -135,6 +136,8 @@ public class HiveCatalog extends AbstractCatalog {
this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf;
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null or empty");
this.hiveVersion = hiveVersion;
+ // add this to hiveConf to make sure table factory and source/sink see the same Hive version as HiveCatalog
+ this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, hiveVersion);
LOG.info("Created HiveCatalog '{}'", catalogName);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
index ccdca1b..dc4c6da 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientFactory.java
@@ -18,7 +18,7 @@
package org.apache.flink.table.catalog.hive.client;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,7 +31,7 @@ public class HiveMetastoreClientFactory {
}
public static HiveMetastoreClientWrapper create(HiveConf hiveConf, String hiveVersion) {
- return new HiveMetastoreClientWrapper(hiveConf,
- StringUtils.isNullOrWhitespaceOnly(hiveVersion) ? HiveShimLoader.getHiveVersion() : hiveVersion);
+ Preconditions.checkNotNull(hiveVersion, "Hive version cannot be null");
+ return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
index 19d6da2..b9a1527 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
@@ -92,7 +93,7 @@ public class HiveInputFormatTest {
);
//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
//serDe temporarily.
- HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
+ HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
@@ -142,7 +143,7 @@ public class HiveInputFormatTest {
//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
//serDe temporarily.
- HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
+ HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index aa02eb6..4ac76cd 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -56,7 +57,7 @@ public class TableEnvHiveConnectorTest {
HiveConf hiveConf = hiveShell.getHiveConf();
hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
hiveCatalog.open();
- hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+ hmsClient = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
}
@Test