You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/10 21:04:16 UTC

[flink] branch master updated: [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e59a9d  [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest
0e59a9d is described below

commit 0e59a9d5f6585999f36e222ed38b63d82e81ce38
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Tue Jul 9 18:30:07 2019 +0800

    [FLINK-13157]reeanble unit test read complext type of HiveInputFormatTest
    
    This closes #9037.
---
 .../functions/hive/conversion/HiveInspectors.java  |   8 +-
 .../batch/connectors/hive/HiveInputFormatTest.java | 106 +++++++++++----------
 2 files changed, 58 insertions(+), 56 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 2ef5ee0..6d82d38 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
@@ -338,18 +339,17 @@ public class HiveInspectors {
 
 			Row row = new Row(fields.size());
 			// StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or list as data
-			if (!data.getClass().isArray() && !(data instanceof List)) {
+			if (!data.getClass().isArray() && !(data instanceof List) && (inspector instanceof StandardStructObjectInspector)) {
 				data = new Object[]{data};
 			}
 			for (int i = 0; i < row.getArity(); i++) {
 				row.setField(
 					i,
 					toFlinkObject(
-						fields.get(i).getFieldObjectInspector(),
-						structInspector.getStructFieldData(data, fields.get(i)))
+							fields.get(i).getFieldObjectInspector(),
+							structInspector.getStructFieldData(data, fields.get(i)))
 				);
 			}
-
 			return row;
 		}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 5d4d302..fea1469 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -23,19 +23,23 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -102,8 +106,8 @@ public class HiveInputFormatTest {
 		sd.setSerdeInfo(new SerDeInfo());
 		sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
 		sd.getSerdeInfo().setParameters(new HashMap<>());
-		sd.getSerdeInfo().getParameters().put("serialization.format", "1");
-		sd.getSerdeInfo().getParameters().put("field.delim", ",");
+		sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+		sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ",");
 		sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
 		tbl.setSd(sd);
 		tbl.setPartitionKeys(new ArrayList<>());
@@ -125,54 +129,52 @@ public class HiveInputFormatTest {
 		Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString());
 	}
 
-//	@Test
-//	public void testReadComplextDataTypeFromHiveInputFormat() throws Exception {
-//		final String dbName = "default";
-//		final String tblName = "complext_test";
-//
-//		TableSchema.Builder builder = new TableSchema.Builder();
-//		builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
-//				DataTypes.ARRAY(DataTypes.INT()),
-//				DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
-//				DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
-//
-//		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
-//		//serDe temporarily.
-//		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
-//		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
-//		tbl.setDbName(dbName);
-//		tbl.setTableName(tblName);
-//		tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-//		tbl.setParameters(new HashMap<>());
-//		StorageDescriptor sd = new StorageDescriptor();
-//		String location = HiveInputFormatTest.class.getResource("/complex_test").getPath();
-//		sd.setLocation(location);
-//		sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
-//		sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-//		sd.setSerdeInfo(new SerDeInfo());
-//		sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
-//		sd.getSerdeInfo().setParameters(new HashMap<>());
-//		sd.getSerdeInfo().getParameters().put("serialization.format", "1");
-//		sd.getSerdeInfo().getParameters().put("field.delim", ";");
-//		//org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe use 'colelction.delim' as a delimiter config key
-//		// it may be a typo of this class
-//		sd.getSerdeInfo().getParameters().put("colelction.delim", ",");
-//		sd.getSerdeInfo().getParameters().put("mapkey.delim", ":");
-//		sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
-//		tbl.setSd(sd);
-//		tbl.setPartitionKeys(new ArrayList<>());
-//
-//		client.createTable(tbl);
-//		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-//		env.setParallelism(1);
-//		RowTypeInfo rowTypeInfo = new RowTypeInfo(builder.build().getFieldTypes(), builder.build().getFieldNames());
-//		List<HiveTablePartition> partitions = new ArrayList<>();
-//		partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-//		HiveTableInputFormat hiveTableInputFormat =
-//			new HiveTableInputFormat(new JobConf(hiveConf), hiveCatalog., partitions);
-//		DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
-//		List<Row> rows = rowDataSet.collect();
-//		Assert.assertEquals(1, rows.size());
-//		Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString());
-//	}
+	@Test
+	public void testReadComplextDataTypeFromHiveInputFormat() throws Exception {
+		final String dbName = "default";
+		final String tblName = "complext_test";
+
+		TableSchema.Builder builder = new TableSchema.Builder();
+		builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
+				DataTypes.ARRAY(DataTypes.INT()),
+				DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
+				DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
+
+		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
+		//serDe temporarily.
+		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
+		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
+		tbl.setDbName(dbName);
+		tbl.setTableName(tblName);
+		tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
+		tbl.setParameters(new HashMap<>());
+		StorageDescriptor sd = new StorageDescriptor();
+		String location = HiveInputFormatTest.class.getResource("/complex_test").getPath();
+		sd.setLocation(location);
+		sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
+		sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
+		sd.setSerdeInfo(new SerDeInfo());
+		sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
+		sd.getSerdeInfo().setParameters(new HashMap<>());
+		sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+		sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ";");
+		sd.getSerdeInfo().getParameters().put(serdeConstants.COLLECTION_DELIM, ",");
+		sd.getSerdeInfo().getParameters().put(serdeConstants.MAPKEY_DELIM, ":");
+		sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
+		tbl.setSd(sd);
+		tbl.setPartitionKeys(new ArrayList<>());
+
+		client.createTable(tbl);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		List<HiveTablePartition> partitions = new ArrayList<>();
+		partitions.add(new HiveTablePartition(sd, new HashMap<>()));
+		CatalogTable catalogTable = new CatalogTableImpl(builder.build(), new HashMap<>(), "TEST_TABLE");
+		HiveTableInputFormat hiveTableInputFormat =
+			new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
+		DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
+		List<Row> rows = rowDataSet.collect();
+		Assert.assertEquals(1, rows.size());
+		Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString());
+	}
 }