You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/05/28 17:06:19 UTC
[flink] branch master updated: [FLINK-12418][hive] Add input/output
format and SerDeLib information when creating Hive table in HiveCatalog and
add 'hive-exec' as provided dependency
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 97510aa [FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog and add 'hive-exec' as provided dependency
97510aa is described below
commit 97510aaa8444895a0cc4df7461889fb66d5ffc01
Author: Rui Li <li...@apache.org>
AuthorDate: Mon May 27 19:13:21 2019 +0800
[FLINK-12418][hive] Add input/output format and SerDeLib information when creating Hive table in HiveCatalog and add 'hive-exec' as provided dependency
To set input/output formats and SerDe lib when creating Hive tables in HiveCatalog, so that we can access these tables later. Also added 'hive-exec' as provided dependency.
This closes #8553.
---
flink-connectors/flink-connector-hive/pom.xml | 6 ++--
.../flink/table/catalog/hive/HiveCatalog.java | 34 +++++++++++++++-------
.../catalog/hive/HiveCatalogHiveMetadataTest.java | 19 ++++++++++++
3 files changed, 46 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 1b672d4..25e475e 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -239,13 +239,11 @@ under the License.
</exclusions>
</dependency>
- <!-- test dependencies -->
-
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
@@ -334,6 +332,8 @@ under the License.
</exclusions>
</dependency>
+ <!-- test dependencies -->
+
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index d2387f0..159499c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalogTable;
import org.apache.flink.table.catalog.AbstractCatalogView;
@@ -67,10 +68,12 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +93,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class HiveCatalog implements Catalog {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
private static final String DEFAULT_DB = "default";
+ private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
+ private static final String DEFAULT_HIVE_TABLE_STORAGE_FORMAT = "TextFile";
// Prefix used to distinguish properties created by Hive and Flink,
// as Hive metastore has its own properties created upon table creation and migration between different versions of metastore.
@@ -474,7 +479,8 @@ public class HiveCatalog implements Catalog {
}
}
- private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+ @VisibleForTesting
+ Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
try {
return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
} catch (NoSuchObjectException e) {
@@ -534,9 +540,9 @@ public class HiveCatalog implements Catalog {
}
private static Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table) {
- Table hiveTable = new Table();
- hiveTable.setDbName(tablePath.getDatabaseName());
- hiveTable.setTableName(tablePath.getObjectName());
+ // let Hive set default parameters for us, e.g. serialization.format
+ Table hiveTable = org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(tablePath.getDatabaseName(),
+ tablePath.getObjectName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
Map<String, String> properties = new HashMap<>(table.getProperties());
@@ -549,11 +555,8 @@ public class HiveCatalog implements Catalog {
hiveTable.setParameters(properties);
// Hive table's StorageDescriptor
- // TODO: This is very basic Hive table.
- // [FLINK-11479] Add input/output format and SerDeLib information for Hive tables.
- StorageDescriptor sd = new StorageDescriptor();
- hiveTable.setSd(sd);
- sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+ StorageDescriptor sd = hiveTable.getSd();
+ setStorageFormat(sd, properties);
List<FieldSchema> allColumns = HiveTableUtil.createHiveColumns(table.getSchema());
@@ -590,6 +593,17 @@ public class HiveCatalog implements Catalog {
return hiveTable;
}
+ private static void setStorageFormat(StorageDescriptor sd, Map<String, String> properties) {
+ // TODO: allow user to specify storage format. Simply use text format for now
+ String storageFormatName = DEFAULT_HIVE_TABLE_STORAGE_FORMAT;
+ StorageFormatDescriptor storageFormatDescriptor = storageFormatFactory.get(storageFormatName);
+ checkArgument(storageFormatDescriptor != null, "Unknown storage format " + storageFormatName);
+ sd.setInputFormat(storageFormatDescriptor.getInputFormat());
+ sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());
+ String serdeLib = storageFormatDescriptor.getSerde();
+ sd.getSerdeInfo().setSerializationLib(serdeLib != null ? serdeLib : LazySimpleSerDe.class.getName());
+ }
+
/**
* Filter out Hive-created properties, and return Flink-created properties.
*/
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index e2d95a3..5a80d85 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -24,13 +24,17 @@ import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.BeforeClass;
+import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -51,6 +55,21 @@ public class HiveCatalogHiveMetadataTest extends CatalogTestBase {
public void testCreateTable_Streaming() throws Exception {
}
+ @Test
+ // verifies that input/output formats and SerDe are set for Hive tables
+ public void testCreateTable_StorageFormatSet() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ Table hiveTable = ((HiveCatalog) catalog).getHiveTable(path1);
+ String inputFormat = hiveTable.getSd().getInputFormat();
+ String outputFormat = hiveTable.getSd().getOutputFormat();
+ String serde = hiveTable.getSd().getSerdeInfo().getSerializationLib();
+ assertFalse(StringUtils.isNullOrWhitespaceOnly(inputFormat));
+ assertFalse(StringUtils.isNullOrWhitespaceOnly(outputFormat));
+ assertFalse(StringUtils.isNullOrWhitespaceOnly(serde));
+ }
+
// ------ utils ------
@Override