You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/07 21:58:15 UTC

[flink] branch release-1.9 updated: [FLINK-13610][hive]Refactor HiveTableSource Test use sql query and remove HiveInputFormatTest

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new a2f5a3b  [FLINK-13610][hive]Refactor HiveTableSource Test use sql query and remove HiveInputFormatTest
a2f5a3b is described below

commit a2f5a3b244bfc4f67730e21a7468a5acb6da0875
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Wed Aug 7 17:32:29 2019 +0800

    [FLINK-13610][hive]Refactor HiveTableSource Test use sql query and remove HiveInputFormatTest
    
    Refactor HiveTableSourceTest use sql query and remove HiveInputFormatTest
    
    This closes #9379
---
 .../flink/connectors/hive/HiveInputFormatTest.java | 181 ---------------------
 .../flink/connectors/hive/HiveTableSourceTest.java |  71 +++++---
 .../src/test/resources/complex_test/1.txt          |   1 -
 .../src/test/resources/test/test.txt               |   4 -
 4 files changed, 48 insertions(+), 209 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
deleted file mode 100644
index b9a1527..0000000
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.hive;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Tests {@link HiveTableInputFormat}.
- */
-public class HiveInputFormatTest {
-
-	public static final String DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
-	public static final String DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName();
-	public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
-
-	private static HiveCatalog hiveCatalog;
-	private static HiveConf hiveConf;
-
-	@BeforeClass
-	public static void createCatalog() throws IOException {
-		hiveConf = HiveTestUtils.createHiveConf();
-		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
-		hiveCatalog.open();
-	}
-
-	@AfterClass
-	public static void closeCatalog() {
-		if (null != hiveCatalog) {
-			hiveCatalog.close();
-		}
-	}
-
-	@Test
-	public void testReadFromHiveInputFormat() throws Exception {
-		final String dbName = "default";
-		final String tblName = "test";
-		TableSchema tableSchema = new TableSchema(
-				new String[]{"a", "b", "c", "d", "e"},
-				new TypeInformation[]{
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO}
-		);
-		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
-		//serDe temporarily.
-		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
-		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
-		tbl.setDbName(dbName);
-		tbl.setTableName(tblName);
-		tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-		tbl.setParameters(new HashMap<>());
-		StorageDescriptor sd = new StorageDescriptor();
-		String location = HiveInputFormatTest.class.getResource("/test").getPath();
-		sd.setLocation(location);
-		sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
-		sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-		sd.setSerdeInfo(new SerDeInfo());
-		sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
-		sd.getSerdeInfo().setParameters(new HashMap<>());
-		sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
-		sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ",");
-		sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
-		tbl.setSd(sd);
-		tbl.setPartitionKeys(new ArrayList<>());
-
-		client.createTable(tbl);
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		RowTypeInfo rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
-		List<HiveTablePartition> partitions = new ArrayList<>();
-		partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(new ObjectPath(dbName, tblName));
-		HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
-		DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
-		List<Row> rows = rowDataSet.collect();
-		Assert.assertEquals(4, rows.size());
-		Assert.assertEquals("1,1,a,1000,1.11", rows.get(0).toString());
-		Assert.assertEquals("2,2,a,2000,2.22", rows.get(1).toString());
-		Assert.assertEquals("3,3,a,3000,3.33", rows.get(2).toString());
-		Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString());
-	}
-
-	@Test
-	public void testReadComplextDataTypeFromHiveInputFormat() throws Exception {
-		final String dbName = "default";
-		final String tblName = "complext_test";
-
-		TableSchema.Builder builder = new TableSchema.Builder();
-		builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
-				DataTypes.ARRAY(DataTypes.INT()),
-				DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
-				DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
-
-		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
-		//serDe temporarily.
-		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
-		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
-		tbl.setDbName(dbName);
-		tbl.setTableName(tblName);
-		tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-		tbl.setParameters(new HashMap<>());
-		StorageDescriptor sd = new StorageDescriptor();
-		String location = HiveInputFormatTest.class.getResource("/complex_test").getPath();
-		sd.setLocation(location);
-		sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
-		sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-		sd.setSerdeInfo(new SerDeInfo());
-		sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
-		sd.getSerdeInfo().setParameters(new HashMap<>());
-		sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
-		sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ";");
-		sd.getSerdeInfo().getParameters().put(serdeConstants.COLLECTION_DELIM, ",");
-		sd.getSerdeInfo().getParameters().put(serdeConstants.MAPKEY_DELIM, ":");
-		sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
-		tbl.setSd(sd);
-		tbl.setPartitionKeys(new ArrayList<>());
-
-		client.createTable(tbl);
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-		List<HiveTablePartition> partitions = new ArrayList<>();
-		partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-		CatalogTable catalogTable = new CatalogTableImpl(builder.build(), new HashMap<>(), "TEST_TABLE");
-		HiveTableInputFormat hiveTableInputFormat =
-			new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
-		DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
-		List<Row> rows = rowDataSet.collect();
-		Assert.assertEquals(1, rows.size());
-		Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString());
-	}
-}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 04070e8..3920d0f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.connectors.hive;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.planner.runtime.utils.TableUtil;
@@ -31,7 +29,6 @@ import org.apache.flink.types.Row;
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,7 +37,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.io.IOException;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import scala.collection.JavaConverters;
 
@@ -81,6 +80,7 @@ public class HiveTableSourceTest {
 
 	@Test
 	public void testReadNonPartitionedTable() throws Exception {
+		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test";
 		hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
@@ -93,17 +93,41 @@ public class HiveTableSourceTest {
 				.commit();
 
 		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
-		ObjectPath tablePath = new ObjectPath(dbName, tblName);
-		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
-		HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
-		Table src = tEnv.fromTableSource(hiveTableSource);
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		Table src = tEnv.sqlQuery("select * from hive.source_db.test");
 		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
 
 		Assert.assertEquals(4, rows.size());
-		Assert.assertEquals(1, rows.get(0).getField(0));
-		Assert.assertEquals(2, rows.get(1).getField(0));
-		Assert.assertEquals(3, rows.get(2).getField(0));
-		Assert.assertEquals(4, rows.get(3).getField(0));
+		Assert.assertEquals("1,1,a,1000,1.11", rows.get(0).toString());
+		Assert.assertEquals("2,2,b,2000,2.22", rows.get(1).toString());
+		Assert.assertEquals("3,3,c,3000,3.33", rows.get(2).toString());
+		Assert.assertEquals("4,4,d,4000,4.44", rows.get(3).toString());
+	}
+
+	@Test
+	public void testReadComplexDataType() throws Exception {
+		final String catalogName = "hive";
+		final String dbName = "source_db";
+		final String tblName = "complex_test";
+		hiveShell.execute("create table source_db.complex_test(" +
+						"a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)");
+		Integer[] array = new Integer[]{1, 2, 3};
+		Map<Integer, String> map = new LinkedHashMap<>();
+		map.put(1, "a");
+		map.put(2, "b");
+		Object[] struct = new Object[]{3, 3L};
+		hiveShell.insertInto(dbName, tblName)
+				.withAllColumns()
+				.addRow(array, map, struct)
+				.commit();
+		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		Table src = tEnv.sqlQuery("select * from hive.source_db.complex_test");
+		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
+		Assert.assertEquals(1, rows.size());
+		assertArrayEquals(array, (Integer[]) rows.get(0).getField(0));
+		assertEquals(map, rows.get(0).getField(1));
+		assertEquals(Row.of(struct[0], struct[1]), rows.get(0).getField(2));
 	}
 
 	/**
@@ -112,11 +136,12 @@ public class HiveTableSourceTest {
 	 */
 	@Test
 	public void testReadPartitionTable() throws Exception {
+		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test_table_pt";
 		hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
 						"(year STRING, value INT) partitioned by (pt int);");
-		hiveShell.insertInto("source_db", "test_table_pt")
+		hiveShell.insertInto(dbName, tblName)
 				.withColumns("year", "value", "pt")
 				.addRow("2014", 3, 0)
 				.addRow("2014", 4, 0)
@@ -124,11 +149,10 @@ public class HiveTableSourceTest {
 				.addRow("2015", 5, 1)
 				.commit();
 		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
-		ObjectPath tablePath = new ObjectPath(dbName, tblName);
-		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
-		HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
-		Table src = tEnv.fromTableSource(hiveTableSource);
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt");
 		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
+
 		assertEquals(4, rows.size());
 		Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
 		assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
@@ -136,11 +160,12 @@ public class HiveTableSourceTest {
 
 	@Test
 	public void testPartitionPrunning() throws Exception {
+		final String catalogName = "hive";
 		final String dbName = "source_db";
 		final String tblName = "test_table_pt_1";
 		hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
 						"(year STRING, value INT) partitioned by (pt int);");
-		hiveShell.insertInto("source_db", "test_table_pt_1")
+		hiveShell.insertInto(dbName, tblName)
 				.withColumns("year", "value", "pt")
 				.addRow("2014", 3, 0)
 				.addRow("2014", 4, 0)
@@ -148,11 +173,10 @@ public class HiveTableSourceTest {
 				.addRow("2015", 5, 1)
 				.commit();
 		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
-		ObjectPath tablePath = new ObjectPath(dbName, tblName);
-		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
-		tEnv.registerTableSource("src", new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable));
-		Table table = tEnv.sqlQuery("select * from src where pt = 0");
-		String[] explain = tEnv.explain(table).split("==.*==\n");
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
+		// first check execution plan to ensure partition prunning works
+		String[] explain = tEnv.explain(src).split("==.*==\n");
 		assertEquals(4, explain.length);
 		String abstractSyntaxTree = explain[1];
 		String optimizedLogicalPlan = explain[2];
@@ -160,7 +184,8 @@ public class HiveTableSourceTest {
 		assertTrue(abstractSyntaxTree.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: false, PartitionNums: 2]"));
 		assertTrue(optimizedLogicalPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
 		assertTrue(physicalExecutionPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
-		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) table)).asJava();
+		// second check execute results
+		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
 		assertEquals(2, rows.size());
 		Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
 		assertArrayEquals(new String[]{"2014,3,0", "2014,4,0"}, rowStrings);
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt b/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt
deleted file mode 100644
index bcb199b..0000000
--- a/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt
+++ /dev/null
@@ -1 +0,0 @@
-1,2,3;1:a,2:b;3,c
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/test/test.txt b/flink-connectors/flink-connector-hive/src/test/resources/test/test.txt
deleted file mode 100644
index 672650e..0000000
--- a/flink-connectors/flink-connector-hive/src/test/resources/test/test.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-1,1,a,1000,1.11
-2,2,a,2000,2.22
-3,3,a,3000,3.33
-4,4,a,4000,4.44
\ No newline at end of file