You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/29 07:22:34 UTC

[flink] branch release-1.9 updated: [FLINK-13012][hive] Handle default partition name of Hive table

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 25e6143  [FLINK-13012][hive] Handle default partition name of Hive table
25e6143 is described below

commit 25e6143a3c425c47851cbd69a41d68fe53f62f91
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Jul 11 21:08:22 2019 +0800

    [FLINK-13012][hive] Handle default partition name of Hive table
    
    This closes #9088
---
 .../connectors/hive/HiveTableOutputFormat.java     | 15 +++-
 .../batch/connectors/hive/HiveTableSource.java     | 11 ++-
 .../connectors/hive/TableEnvHiveConnectorTest.java | 88 ++++++++++++++++++++++
 3 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index acb1bf9..ade5830 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -141,6 +141,9 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 	// to convert Flink object to Hive object
 	private transient HiveObjectConversion[] hiveConversions;
 
+	// used when partition values is null or empty
+	private transient String defaultPartitionName;
+
 	public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, CatalogTable table, HiveTablePartition hiveTablePartition,
 								Properties tableProperties, boolean overwrite) {
 		super(jobConf.getCredentials());
@@ -298,6 +301,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 			rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
 				Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()),
 				objectInspectors);
+			defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+					HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
 		}
 	}
 
@@ -310,10 +315,12 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 				// only need to check the dynamic partitions
 				final int numStaticPart = hiveTablePartition.getPartitionSpec().size();
 				for (int i = dynamicPartitionOffset; i < record.getArity(); i++) {
-					// TODO: seems Hive also just calls toString(), need further investigation to confirm
-					// TODO: validate partition value
-					String partVal = record.getField(i).toString();
-					dynPartSpec.put(partitionColumns.get(i - dynamicPartitionOffset + numStaticPart), partVal);
+					Object field = record.getField(i);
+					String partitionValue = field != null ? field.toString() : null;
+					if (partitionValue == null || partitionValue.isEmpty()) {
+						partitionValue = defaultPartitionName;
+					}
+					dynPartSpec.put(partitionColumns.get(i - dynamicPartitionOffset + numStaticPart), partitionValue);
 				}
 				String partName = Warehouse.makePartPath(dynPartSpec);
 				partitionWriter = partitionToWriter.get(partName);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 706442d..e0734f4 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -157,6 +157,8 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 			String tableName = tablePath.getObjectName();
 			List<String> partitionColNames = catalogTable.getPartitionKeys();
 			if (partitionColNames != null && partitionColNames.size() > 0) {
+				final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+						HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
 				List<Partition> partitions =
 						client.listPartitions(dbName, tableName, (short) -1);
 				for (Partition partition : partitions) {
@@ -168,7 +170,14 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 						String partitionValue = partition.getValues().get(i);
 						partitionSpec.put(partitionColName, partitionValue);
 						DataType type = catalogTable.getSchema().getFieldDataType(partitionColName).get();
-						Object partitionObject = restorePartitionValueFromFromType(partitionValue, type);
+						Object partitionObject;
+						if (defaultPartitionName.equals(partitionValue)) {
+							LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
+							// while this is inline with Hive, seems it should be null for string columns as well
+							partitionObject = typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR ? defaultPartitionName : null;
+						} else {
+							partitionObject = restorePartitionValueFromFromType(partitionValue, type);
+						}
 						partitionColValues.put(partitionColName, partitionObject);
 					}
 					HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
new file mode 100644
index 0000000..2f910a9
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/TableEnvHiveConnectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test hive connector with table API.
+ */
+@RunWith(FlinkStandaloneHiveRunner.class)
+public class TableEnvHiveConnectorTest {
+
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
+
+	private static HiveCatalog hiveCatalog;
+	private static HiveMetastoreClientWrapper hmsClient;
+
+	@BeforeClass
+	public static void setup() {
+		HiveConf hiveConf = hiveShell.getHiveConf();
+		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+		hiveCatalog.open();
+		hmsClient = HiveMetastoreClientFactory.create(hiveConf, null);
+	}
+
+	@Test
+	public void testDefaultPartitionName() throws Exception {
+		hiveShell.execute("create database db1");
+		hiveShell.execute("create table db1.src (x int, y int)");
+		hiveShell.execute("create table db1.part (x int) partitioned by (y int)");
+		hiveShell.insertInto("db1", "src").addRow(1, 1).addRow(2, null).commit();
+
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
+		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tableEnv.useCatalog(hiveCatalog.getName());
+
+		// test generating partitions with default name
+		tableEnv.sqlUpdate("insert into db1.part select * from db1.src");
+		tableEnv.execute("mytest");
+		HiveConf hiveConf = hiveShell.getHiveConf();
+		String defaultPartName = hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
+		Table hiveTable = hmsClient.getTable("db1", "part");
+		Path defaultPartPath = new Path(hiveTable.getSd().getLocation(), "y=" + defaultPartName);
+		FileSystem fs = defaultPartPath.getFileSystem(hiveConf);
+		assertTrue(fs.exists(defaultPartPath));
+
+		// TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed
+		assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part"));
+
+		hiveShell.execute("drop database db1 cascade");
+	}
+}