You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/08/06 03:07:21 UTC
[flink] branch release-1.9 updated: [FLINK-13192][hive] Add tests
for different Hive table formats
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 6c25805 [FLINK-13192][hive] Add tests for different Hive table formats
6c25805 is described below
commit 6c25805df329a11a5ea104672c7a3f0b7cec220c
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jul 29 21:02:52 2019 +0800
[FLINK-13192][hive] Add tests for different Hive table formats
This closes #9264
---
flink-connectors/flink-connector-hive/pom.xml | 6 +---
.../connectors/hive/HiveTableOutputFormat.java | 19 ++++++----
.../flink/table/catalog/hive/HiveCatalog.java | 23 +++++++++---
.../connectors/hive/TableEnvHiveConnectorTest.java | 42 ++++++++++++++++++++++
4 files changed, 74 insertions(+), 16 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 6ce3dc2..4cbe68b 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -416,11 +416,7 @@ under the License.
<artifactId>tez-common</artifactId>
</exclusion>
<exclusion>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce</artifactId>
- </exclusion>
- <exclusion>
- <!-- This dependency is not available with java 9.-->
+ <!-- This dependency is no longer shipped with the JDK since Java 9.-->
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index e4caac1..9e1ee46 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -58,9 +58,10 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
// number of non-partitioning columns
private transient int numNonPartitionColumns;
- private transient AbstractSerDe serializer;
+ // SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common base class
+ private transient Serializer recordSerDe;
//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient Class<? extends Writable> outputClass;
@@ -257,11 +259,14 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
- serializer = (AbstractSerDe) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
- ReflectionUtils.setConf(serializer, jobConf);
+ Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
+ Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
+ "Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName());
+ recordSerDe = (Serializer) serdeLib;
+ ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
- SerDeUtils.initializeSerDe(serializer, jobConf, tableProperties, null);
- outputClass = serializer.getSerializedClass();
+ SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
+ outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing Hive serializer", e);
}
@@ -331,7 +336,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
partitionToWriter.put(partName, partitionWriter);
}
}
- partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record), rowObjectInspector));
+ partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record), rowObjectInspector));
} catch (IOException | SerDeException e) {
throw new IOException("Could not write Record.", e);
} catch (MetaException e) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 261180f..dd50ce2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
@@ -323,7 +324,7 @@ public class HiveCatalog extends AbstractCatalog {
checkNotNull(tablePath, "tablePath cannot be null");
Table hiveTable = getHiveTable(tablePath);
- return instantiateCatalogTable(hiveTable);
+ return instantiateCatalogTable(hiveTable, hiveConf);
}
@Override
@@ -394,7 +395,7 @@ public class HiveCatalog extends AbstractCatalog {
return;
}
- CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable);
+ CatalogBaseTable existingTable = instantiateCatalogTable(hiveTable, hiveConf);
if (existingTable.getClass() != newCatalogTable.getClass()) {
throw new CatalogException(
@@ -493,7 +494,7 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- private static CatalogBaseTable instantiateCatalogTable(Table hiveTable) {
+ private static CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) {
boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW;
// Table properties
@@ -506,8 +507,22 @@ public class HiveCatalog extends AbstractCatalog {
String comment = properties.remove(HiveCatalogConfig.COMMENT);
// Table schema
+ List<FieldSchema> fields;
+ if (org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf,
+ hiveTable.getSd().getSerdeInfo().getSerializationLib())) {
+ // get schema from metastore
+ fields = hiveTable.getSd().getCols();
+ } else {
+ // get schema from deserializer
+ try {
+ fields = MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
+ MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
+ } catch (SerDeException | MetaException e) {
+ throw new CatalogException("Failed to get Hive table schema from deserializer", e);
+ }
+ }
TableSchema tableSchema =
- HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+ HiveTableUtil.createTableSchema(fields, hiveTable.getPartitionKeys());
// Partition keys
List<String> partitionKeys = new ArrayList<>();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 4ac76cd..e8c402a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -36,6 +36,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -100,10 +102,50 @@ public class TableEnvHiveConnectorTest {
hiveShell.execute("drop database db1 cascade");
}
+ @Test
+ public void testDifferentFormats() throws Exception {
+ String[] formats = new String[]{"orc", "parquet", "sequencefile", "csv"};
+ for (String format : formats) {
+ readWriteFormat(format);
+ }
+ }
+
+ private void readWriteFormat(String format) throws Exception {
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+
+ hiveShell.execute("create database db1");
+
+ // create source and dest tables
+ String suffix;
+ if (format.equals("csv")) {
+ suffix = "row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'";
+ } else {
+ suffix = "stored as " + format;
+ }
+ hiveShell.execute("create table db1.src (i int,s string) " + suffix);
+ hiveShell.execute("create table db1.dest (i int,s string) " + suffix);
+
+ // prepare source data with Hive
+ hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
+
+ // populate dest table with source table
+ tableEnv.sqlUpdate("insert into db1.dest select * from db1.src");
+ tableEnv.execute("test_" + format);
+
+ // verify data on hive side
+ verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
+
+ hiveShell.execute("drop database db1 cascade");
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
return tableEnv;
}
+
+ private void verifyHiveQueryResult(String query, List<String> expected) {
+ assertEquals(new HashSet<>(expected), new HashSet<>(hiveShell.executeQuery(query)));
+ }
}