You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/29 07:22:34 UTC
[flink] branch release-1.9 updated: [FLINK-13012][hive] Handle
default partition name of Hive table
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 25e6143 [FLINK-13012][hive] Handle default partition name of Hive table
25e6143 is described below
commit 25e6143a3c425c47851cbd69a41d68fe53f62f91
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jul 11 21:08:22 2019 +0800
[FLINK-13012][hive] Handle default partition name of Hive table
This closes #9088
---
.../connectors/hive/HiveTableOutputFormat.java | 15 +++-
.../batch/connectors/hive/HiveTableSource.java | 11 ++-
.../connectors/hive/TableEnvHiveConnectorTest.java | 88 ++++++++++++++++++++++
3 files changed, 109 insertions(+), 5 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index acb1bf9..ade5830 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -141,6 +141,9 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
+ // used when partition values is null or empty
+ private transient String defaultPartitionName;
+
public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, CatalogTable table, HiveTablePartition hiveTablePartition,
Properties tableProperties, boolean overwrite) {
super(jobConf.getCredentials());
@@ -298,6 +301,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()),
objectInspectors);
+ defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+ HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
}
}
@@ -310,10 +315,12 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
// only need to check the dynamic partitions
final int numStaticPart = hiveTablePartition.getPartitionSpec().size();
for (int i = dynamicPartitionOffset; i < record.getArity(); i++) {
- // TODO: seems Hive also just calls toString(), need further investigation to confirm
- // TODO: validate partition value
- String partVal = record.getField(i).toString();
- dynPartSpec.put(partitionColumns.get(i - dynamicPartitionOffset + numStaticPart), partVal);
+ Object field = record.getField(i);
+ String partitionValue = field != null ? field.toString() : null;
+ if (partitionValue == null || partitionValue.isEmpty()) {
+ partitionValue = defaultPartitionName;
+ }
+ dynPartSpec.put(partitionColumns.get(i - dynamicPartitionOffset + numStaticPart), partitionValue);
}
String partName = Warehouse.makePartPath(dynPartSpec);
partitionWriter = partitionToWriter.get(partName);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 706442d..e0734f4 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -157,6 +157,8 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
String tableName = tablePath.getObjectName();
List<String> partitionColNames = catalogTable.getPartitionKeys();
if (partitionColNames != null && partitionColNames.size() > 0) {
+ final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+ HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
List<Partition> partitions =
client.listPartitions(dbName, tableName, (short) -1);
for (Partition partition : partitions) {
@@ -168,7 +170,14 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
String partitionValue = partition.getValues().get(i);
partitionSpec.put(partitionColName, partitionValue);
DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get();
- Object partitionObject = restorePartitionValueFromFromType(partitionValue, type);
+ Object partitionObject;
+ if (defaultPartitionName.equals(partitionValue)) {
+ LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
+ // while this is inline with Hive, seems it should be null for string columns as well
+ partitionObject = typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR ? defaultPartitionName : null;
+ } else {
+ partitionObject = restorePartitionValueFromFromType(partitionValue, type);
+ }
partitionColValues.put(partitionColName, partitionObject);
}
HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
new file mode 100644
index 0000000..2f910a9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test hive connector with table API.
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class TableEnvHiveConnectorTest {
+
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
+
+ private static HiveCatalog hiveCatalog;
+ private static HiveMetastoreClientWrapper hmsClient;
+
+ @BeforeClass
+ public static void setup() {
+ HiveConf hiveConf = hiveShell.getHiveConf();
+ hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+ hiveCatalog.open();
+ hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+ }
+
+ @Test
+ public void testDefaultPartitionName() throws Exception {
+ hiveShell.execute("create database db1");
+ hiveShell.execute("create table db1.src (x int, y int)");
+ hiveShell.execute("create table db1.part (x int) partitioned by (y int)");
+ hiveShell.insertInto("db1", "src").addRow(1, 1).addRow(2, null).commit();
+
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+ tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tableEnv.useCatalog(hiveCatalog.getName());
+
+ // test generating partitions with default name
+ tableEnv.sqlUpdate("insert into db1.part select * from db1.src");
+ tableEnv.execute("mytest");
+ HiveConf hiveConf = hiveShell.getHiveConf();
+ String defaultPartName = hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+ Table hiveTable = hmsClient.getTable("db1", "part");
+ Path defaultPartPath = new Path(hiveTable.getSd().getLocation(), "y=" + defaultPartName);
+ FileSystem fs = defaultPartPath.getFileSystem(hiveConf);
+ assertTrue(fs.exists(defaultPartPath));
+
+ // TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed
+ assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part"));
+
+ hiveShell.execute("drop database db1 cascade");
+ }
+}