You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/10 21:04:16 UTC
[flink] branch master updated: [FLINK-13157]reeanble unit test read
complext type of HiveInputFormatTest
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0e59a9d [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest
0e59a9d is described below
commit 0e59a9d5f6585999f36e222ed38b63d82e81ce38
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Tue Jul 9 18:30:07 2019 +0800
[FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest
This closes #9037.
---
.../functions/hive/conversion/HiveInspectors.java | 8 +-
.../batch/connectors/hive/HiveInputFormatTest.java | 106 +++++++++++----------
2 files changed, 58 insertions(+), 56 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 2ef5ee0..6d82d38 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
@@ -338,18 +339,17 @@ public class HiveInspectors {
Row row = new Row(fields.size());
// StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or list as data
- if (!data.getClass().isArray() && !(data instanceof List)) {
+ if (!data.getClass().isArray() && !(data instanceof List) && (inspector instanceof StandardStructObjectInspector)) {
data = new Object[]{data};
}
for (int i = 0; i < row.getArity(); i++) {
row.setField(
i,
toFlinkObject(
- fields.get(i).getFieldObjectInspector(),
- structInspector.getStructFieldData(data, fields.get(i)))
+ fields.get(i).getFieldObjectInspector(),
+ structInspector.getStructFieldData(data, fields.get(i)))
);
}
-
return row;
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 5d4d302..fea1469 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -23,19 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -102,8 +106,8 @@ public class HiveInputFormatTest {
sd.setSerdeInfo(new SerDeInfo());
sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
sd.getSerdeInfo().setParameters(new HashMap<>());
- sd.getSerdeInfo().getParameters().put("serialization.format", "1");
- sd.getSerdeInfo().getParameters().put("field.delim", ",");
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ",");
sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
tbl.setSd(sd);
tbl.setPartitionKeys(new ArrayList<>());
@@ -125,54 +129,52 @@ public class HiveInputFormatTest {
Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString());
}
-// @Test
-// public void testReadComplextDataTypeFromHiveInputFormat() throws Exception {
-// final String dbName = "default";
-// final String tblName = "complext_test";
-//
-// TableSchema.Builder builder = new TableSchema.Builder();
-// builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
-// DataTypes.ARRAY(DataTypes.INT()),
-// DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
-// DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
-//
-// //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
-// //serDe temporarily.
-// HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
-// org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
-// tbl.setDbName(dbName);
-// tbl.setTableName(tblName);
-// tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-// tbl.setParameters(new HashMap<>());
-// StorageDescriptor sd = new StorageDescriptor();
-// String location = HiveInputFormatTest.class.getResource("/complex_test").getPath();
-// sd.setLocation(location);
-// sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
-// sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-// sd.setSerdeInfo(new SerDeInfo());
-// sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
-// sd.getSerdeInfo().setParameters(new HashMap<>());
-// sd.getSerdeInfo().getParameters().put("serialization.format", "1");
-// sd.getSerdeInfo().getParameters().put("field.delim", ";");
-// //org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe use 'colelction.delim' as a delimiter config key
-// // it may be a typo of this class
-// sd.getSerdeInfo().getParameters().put("colelction.delim", ",");
-// sd.getSerdeInfo().getParameters().put("mapkey.delim", ":");
-// sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
-// tbl.setSd(sd);
-// tbl.setPartitionKeys(new ArrayList<>());
-//
-// client.createTable(tbl);
-// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-// env.setParallelism(1);
-// RowTypeInfo rowTypeInfo = new RowTypeInfo(builder.build().getFieldTypes(), builder.build().getFieldNames());
-// List<HiveTablePartition> partitions = new ArrayList<>();
-// partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-// HiveTableInputFormat hiveTableInputFormat =
-// new HiveTableInputFormat(new JobConf(hiveConf), hiveCatalog., partitions);
-// DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
-// List<Row> rows = rowDataSet.collect();
-// Assert.assertEquals(1, rows.size());
-// Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString());
-// }
+ @Test
+ public void testReadComplextDataTypeFromHiveInputFormat() throws Exception {
+ final String dbName = "default";
+ final String tblName = "complext_test";
+
+ TableSchema.Builder builder = new TableSchema.Builder();
+ builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
+ DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
+
+ //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
+ //serDe temporarily.
+ HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
+ org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
+ tbl.setDbName(dbName);
+ tbl.setTableName(tblName);
+ tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
+ tbl.setParameters(new HashMap<>());
+ StorageDescriptor sd = new StorageDescriptor();
+ String location = HiveInputFormatTest.class.getResource("/complex_test").getPath();
+ sd.setLocation(location);
+ sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
+ sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
+ sd.getSerdeInfo().setParameters(new HashMap<>());
+ sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ";");
+ sd.getSerdeInfo().getParameters().put(serdeConstants.COLLECTION_DELIM, ",");
+ sd.getSerdeInfo().getParameters().put(serdeConstants.MAPKEY_DELIM, ":");
+ sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
+ tbl.setSd(sd);
+ tbl.setPartitionKeys(new ArrayList<>());
+
+ client.createTable(tbl);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ List<HiveTablePartition> partitions = new ArrayList<>();
+ partitions.add(new HiveTablePartition(sd, new HashMap<>()));
+ CatalogTable catalogTable = new CatalogTableImpl(builder.build(), new HashMap<>(), "TEST_TABLE");
+ HiveTableInputFormat hiveTableInputFormat =
+ new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
+ DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
+ List<Row> rows = rowDataSet.collect();
+ Assert.assertEquals(1, rows.size());
+ Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString());
+ }
}