You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/08/06 03:07:21 UTC

[flink] branch release-1.9 updated: [FLINK-13192][hive] Add tests for different Hive table formats

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 6c25805  [FLINK-13192][hive] Add tests for different Hive table formats
6c25805 is described below

commit 6c25805df329a11a5ea104672c7a3f0b7cec220c
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jul 29 21:02:52 2019 +0800

    [FLINK-13192][hive] Add tests for different Hive table formats
    
    This closes #9264
---
 flink-connectors/flink-connector-hive/pom.xml      |  6 +---
 .../connectors/hive/HiveTableOutputFormat.java     | 19 ++++++----
 .../flink/table/catalog/hive/HiveCatalog.java      | 23 +++++++++---
 .../connectors/hive/TableEnvHiveConnectorTest.java | 42 ++++++++++++++++++++++
 4 files changed, 74 insertions(+), 16 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 6ce3dc2..4cbe68b 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -416,11 +416,7 @@ under the License.
 					<artifactId>tez-common</artifactId>
 				</exclusion>
 				<exclusion>
-					<groupId>org.apache.tez</groupId>
-					<artifactId>tez-mapreduce</artifactId>
-				</exclusion>
-				<exclusion>
-					<!-- This dependency is not available with java 9.-->
+					<!-- This dependency is no longer shipped with the JDK since Java 9.-->
 					<groupId>jdk.tools</groupId>
 					<artifactId>jdk.tools</artifactId>
 				</exclusion>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index e4caac1..9e1ee46 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -58,9 +58,10 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 	// number of non-partitioning columns
 	private transient int numNonPartitionColumns;
 
-	private transient AbstractSerDe serializer;
+	// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
+	private transient Serializer recordSerDe;
 	//StructObjectInspector represents the hive row structure.
 	private transient StructObjectInspector rowObjectInspector;
 	private transient Class<? extends Writable> outputClass;
@@ -257,11 +259,14 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 	public void open(int taskNumber, int numTasks) throws IOException {
 		try {
 			StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
-			serializer = (AbstractSerDe) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
-			ReflectionUtils.setConf(serializer, jobConf);
+			Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
+			Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+					"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName());
+			recordSerDe = (Serializer) serdeLib;
+			ReflectionUtils.setConf(recordSerDe, jobConf);
 			// TODO: support partition properties, for now assume they're same as table properties
-			SerDeUtils.initializeSerDe(serializer, jobConf, tableProperties, null);
-			outputClass = serializer.getSerializedClass();
+			SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
+			outputClass = recordSerDe.getSerializedClass();
 		} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
 			throw new FlinkRuntimeException("Error initializing Hive serializer", e);
 		}
@@ -331,7 +336,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 					partitionToWriter.put(partName, partitionWriter);
 				}
 			}
-			partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record), rowObjectInspector));
+			partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
 		} catch (IOException | SerDeException e) {
 			throw new IOException("Could not write Record.", e);
 		} catch (MetaException e) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 261180f..dd50ce2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
 import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.thrift.TException;
@@ -323,7 +324,7 @@ public class HiveCatalog extends AbstractCatalog {
 		checkNotNull(tablePath, "tablePath cannot be null");
 
 		Table hiveTable = getHiveTable(tablePath);
-		return instantiateCatalogTable(hiveTable);
+		return instantiateCatalogTable(hiveTable, hiveConf);
 	}
 
 	@Override
@@ -394,7 +395,7 @@ public class HiveCatalog extends AbstractCatalog {
 			return;
 		}
 
-		CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable);
+		CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable, hiveConf);
 
 		if (existingTable.getClass() != newCatalogTable.getClass()) {
 			throw new CatalogException(
@@ -493,7 +494,7 @@ public class HiveCatalog extends AbstractCatalog {
 		}
 	}
 
-	private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
+	private static CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) {
 		boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;
 
 		// Table properties
@@ -506,8 +507,22 @@ public class HiveCatalog extends AbstractCatalog {
 		String comment = properties.remove(HiveCatalogConfig.COMMENT);
 
 		// Table schema
+		List<FieldSchema> fields;
+		if (org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf,
+				hiveTable.getSd().getSerdeInfo().getSerializationLib())) {
+			// get schema from metastore
+			fields = hiveTable.getSd().getCols();
+		} else {
+			// get schema from deserializer
+			try {
+				fields = MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
+						MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
+			} catch (SerDeException | MetaException e) {
+				throw new CatalogException("Failed to get Hive table schema from deserializer", e);
+			}
+		}
 		TableSchema tableSchema =
-			HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+			HiveTableUtil.createTableSchema(fields, hiveTable.getPartitionKeys());
 
 		// Partition keys
 		List<String> partitionKeys = new ArrayList<>();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 4ac76cd..e8c402a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -36,6 +36,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -100,10 +102,50 @@ public class TableEnvHiveConnectorTest {
 		hiveShell.execute("drop database db1 cascade");
 	}
 
+	@Test
+	public void testDifferentFormats() throws Exception {
+		String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv"};
+		for (String format : formats) {
+			readWriteFormat(format);
+		}
+	}
+
+	private void readWriteFormat(String format) throws Exception {
+		TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+
+		hiveShell.execute("create database db1");
+
+		// create source and dest tables
+		String suffix;
+		if (format.equals("csv")) {
+			suffix = "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'";
+		} else {
+			suffix = "stored as " + format;
+		}
+		hiveShell.execute("create table db1.src (i int,s string) " + suffix);
+		hiveShell.execute("create table db1.dest (i int,s string) " + suffix);
+
+		// prepare source data with Hive
+		hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
+
+		// populate dest table with source table
+		tableEnv.sqlUpdate("insert into db1.dest select * from db1.src");
+		tableEnv.execute("test_" + format);
+
+		// verify data on hive side
+		verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
+
+		hiveShell.execute("drop database db1 cascade");
+	}
+
 	private TableEnvironment getTableEnvWithHiveCatalog() {
 		TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
 		tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
 		tableEnv.useCatalog(hiveCatalog.getName());
 		return tableEnv;
 	}
+
+	private void verifyHiveQueryResult(String query, List<String> expected) {
+		assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query)));
+	}
 }