You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/28 17:06:19 UTC

[flink] branch master updated: [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog and add 'hive-exec' as provided dependency

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 97510aa  [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog and add 'hive-exec' as provided dependency
97510aa is described below

commit 97510aaa8444895a0cc4df7461889fb66d5ffc01
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 27 19:13:21 2019 +0800

    [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog and add 'hive-exec' as provided dependency
    
    To set input/output formats and SerDe lib when creating Hive tables in HiveCatalog, so that we can access these tables later. Also added 'hive-exec' as provided dependency.
    
    This closes #8553.
---
 flink-connectors/flink-connector-hive/pom.xml      |  6 ++--
 .../flink/table/catalog/hive/HiveCatalog.java      | 34 +++++++++++++++-------
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  | 19 ++++++++++++
 3 files changed, 46 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 1b672d4..25e475e 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -239,13 +239,11 @@ under the License.
 			</exclusions>
 		</dependency>
 
-		<!-- test dependencies -->
-
 		<dependency>
 			<groupId>org.apache.hive</groupId>
 			<artifactId>hive-exec</artifactId>
 			<version>${hive.version}</version>
-			<scope>test</scope>
+			<scope>provided</scope>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.hive</groupId>
@@ -334,6 +332,8 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- test dependencies -->
+
 		<dependency>
 			<groupId>org.apache.derby</groupId>
 			<artifactId>derby</artifactId>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index d2387f0..159499c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.AbstractCatalogTable;
 import org.apache.flink.table.catalog.AbstractCatalogView;
@@ -67,10 +68,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +93,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class HiveCatalog implements Catalog {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 	private static final String DEFAULT_DB = "default";
+	private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
+	private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile";
 
 	// Prefix used to distinguish properties created by Hive and Flink,
 	// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
@@ -474,7 +479,8 @@ public class HiveCatalog implements Catalog {
 		}
 	}
 
-	private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+	@VisibleForTesting
+	Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
 		try {
 			return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
 		} catch (NoSuchObjectException e) {
@@ -534,9 +540,9 @@ public class HiveCatalog implements Catalog {
 	}
 
 	private  static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
-		Table hiveTable = new Table();
-		hiveTable.setDbName(tablePath.getDatabaseName());
-		hiveTable.setTableName(tablePath.getObjectName());
+		// let Hive set default parameters for us, e.g. serialization.format
+		Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+			tablePath.getObjectName());
 		hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
 		Map<String, String> properties = new HashMap<>(table.getProperties());
@@ -549,11 +555,8 @@ public class HiveCatalog implements Catalog {
 		hiveTable.setParameters(properties);
 
 		// Hive table's StorageDescriptor
-		// TODO: This is very basic Hive table.
-		//  [FLINK-11479] Add input/output format and SerDeLib information for Hive tables.
-		StorageDescriptor sd = new StorageDescriptor();
-		hiveTable.setSd(sd);
-		sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+		StorageDescriptor sd = hiveTable.getSd();
+		setStorageFormat(sd, properties);
 
 		List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
 
@@ -590,6 +593,17 @@ public class HiveCatalog implements Catalog {
 		return hiveTable;
 	}
 
+	private static void setStorageFormat(StorageDescriptor sd, Map<String, String> properties) {
+		// TODO: allow user to specify storage format. Simply use text format for now
+		String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT;
+		StorageFormatDescriptor storageFormatDescriptor = storageFormatFactory.get(storageFormatName);
+		checkArgument(storageFormatDescriptor != null, "Unknown storage format " + storageFormatName);
+		sd.setInputFormat(storageFormatDescriptor.getInputFormat());
+		sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());
+		String serdeLib = storageFormatDescriptor.getSerde();
+		sd.getSerdeInfo().setSerializationLib(serdeLib != null ? serdeLib : LazySimpleSerDe.class.getName());
+	}
+
 	/**
 	 * Filter out Hive-created properties, and return Flink-created properties.
 	 */
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index e2d95a3..5a80d85 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -24,13 +24,17 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
 
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -51,6 +55,21 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
 	public void testCreateTable_Streaming() throws Exception {
 	}
 
+	@Test
+	// verifies that input/output formats and SerDe are set for Hive tables
+	public void testCreateTable_StorageFormatSet() throws Exception {
+		catalog.createDatabase(db1, createDb(), false);
+		catalog.createTable(path1, createTable(), false);
+
+		Table hiveTable = ((HiveCatalog) catalog).getHiveTable(path1);
+		String inputFormat = hiveTable.getSd().getInputFormat();
+		String outputFormat = hiveTable.getSd().getOutputFormat();
+		String serde = hiveTable.getSd().getSerdeInfo().getSerializationLib();
+		assertFalse(StringUtils.isNullOrWhitespaceOnly(inputFormat));
+		assertFalse(StringUtils.isNullOrWhitespaceOnly(outputFormat));
+		assertFalse(StringUtils.isNullOrWhitespaceOnly(serde));
+	}
+
 	// ------ utils ------
 
 	@Override