You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/07 21:58:15 UTC
[flink] branch release-1.9 updated: [FLINK-13610][hive]Refactor
HiveTableSource Test use sql query and remove HiveInputFormatTest
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new a2f5a3b [FLINK-13610][hive]Refactor HiveTableSource Test use sql query and remove HiveInputFormatTest
a2f5a3b is described below
commit a2f5a3b244bfc4f67730e21a7468a5acb6da0875
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Wed Aug 7 17:32:29 2019 +0800
[FLINK-13610][hive]Refactor HiveTableSource Test use sql query and remove HiveInputFormatTest
Refactor HiveTableSourceTest use sql query and remove HiveInputFormatTest
This closes #9379
---
.../flink/connectors/hive/HiveInputFormatTest.java | 181 ---------------------
.../flink/connectors/hive/HiveTableSourceTest.java | 71 +++++---
.../src/test/resources/complex_test/1.txt | 1 -
.../src/test/resources/test/test.txt | 4 -
4 files changed, 48 insertions(+), 209 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
deleted file mode 100644
index b9a1527..0000000
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveInputFormatTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.hive;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Tests {@link HiveTableInputFormat}.
- */
-public class HiveInputFormatTest {
-
- public static final String DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
- public static final String DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName();
- public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
-
- private static HiveCatalog hiveCatalog;
- private static HiveConf hiveConf;
-
- @BeforeClass
- public static void createCatalog() throws IOException {
- hiveConf = HiveTestUtils.createHiveConf();
- hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
- hiveCatalog.open();
- }
-
- @AfterClass
- public static void closeCatalog() {
- if (null != hiveCatalog) {
- hiveCatalog.close();
- }
- }
-
- @Test
- public void testReadFromHiveInputFormat() throws Exception {
- final String dbName = "default";
- final String tblName = "test";
- TableSchema tableSchema = new TableSchema(
- new String[]{"a", "b", "c", "d", "e"},
- new TypeInformation[]{
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO}
- );
- //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
- //serDe temporarily.
- HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
- org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
- tbl.setDbName(dbName);
- tbl.setTableName(tblName);
- tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
- tbl.setParameters(new HashMap<>());
- StorageDescriptor sd = new StorageDescriptor();
- String location = HiveInputFormatTest.class.getResource("/test").getPath();
- sd.setLocation(location);
- sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
- sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
- sd.getSerdeInfo().setParameters(new HashMap<>());
- sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ",");
- sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
- tbl.setSd(sd);
- tbl.setPartitionKeys(new ArrayList<>());
-
- client.createTable(tbl);
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- RowTypeInfo rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
- List<HiveTablePartition> partitions = new ArrayList<>();
- partitions.add(new HiveTablePartition(sd, new HashMap<>()));
- CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(new ObjectPath(dbName, tblName));
- HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
- DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
- List<Row> rows = rowDataSet.collect();
- Assert.assertEquals(4, rows.size());
- Assert.assertEquals("1,1,a,1000,1.11", rows.get(0).toString());
- Assert.assertEquals("2,2,a,2000,2.22", rows.get(1).toString());
- Assert.assertEquals("3,3,a,3000,3.33", rows.get(2).toString());
- Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString());
- }
-
- @Test
- public void testReadComplextDataTypeFromHiveInputFormat() throws Exception {
- final String dbName = "default";
- final String tblName = "complext_test";
-
- TableSchema.Builder builder = new TableSchema.Builder();
- builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
- DataTypes.ARRAY(DataTypes.INT()),
- DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
- DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
-
- //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
- //serDe temporarily.
- HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, HiveShimLoader.getHiveVersion());
- org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
- tbl.setDbName(dbName);
- tbl.setTableName(tblName);
- tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
- tbl.setParameters(new HashMap<>());
- StorageDescriptor sd = new StorageDescriptor();
- String location = HiveInputFormatTest.class.getResource("/complex_test").getPath();
- sd.setLocation(location);
- sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
- sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
- sd.getSerdeInfo().setParameters(new HashMap<>());
- sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
- sd.getSerdeInfo().getParameters().put(serdeConstants.FIELD_DELIM, ";");
- sd.getSerdeInfo().getParameters().put(serdeConstants.COLLECTION_DELIM, ",");
- sd.getSerdeInfo().getParameters().put(serdeConstants.MAPKEY_DELIM, ":");
- sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
- tbl.setSd(sd);
- tbl.setPartitionKeys(new ArrayList<>());
-
- client.createTable(tbl);
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- List<HiveTablePartition> partitions = new ArrayList<>();
- partitions.add(new HiveTablePartition(sd, new HashMap<>()));
- CatalogTable catalogTable = new CatalogTableImpl(builder.build(), new HashMap<>(), "TEST_TABLE");
- HiveTableInputFormat hiveTableInputFormat =
- new HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
- DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
- List<Row> rows = rowDataSet.collect();
- Assert.assertEquals(1, rows.size());
- Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString());
- }
-}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 04070e8..3920d0f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.planner.runtime.utils.TableUtil;
@@ -31,7 +29,6 @@ import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -40,7 +37,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import scala.collection.JavaConverters;
@@ -81,6 +80,7 @@ public class HiveTableSourceTest {
@Test
public void testReadNonPartitionedTable() throws Exception {
+ final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "test";
hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
@@ -93,17 +93,41 @@ public class HiveTableSourceTest {
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
- CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
- HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
- Table src = tEnv.fromTableSource(hiveTableSource);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ Table src = tEnv.sqlQuery("select * from hive.source_db.test");
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
Assert.assertEquals(4, rows.size());
- Assert.assertEquals(1, rows.get(0).getField(0));
- Assert.assertEquals(2, rows.get(1).getField(0));
- Assert.assertEquals(3, rows.get(2).getField(0));
- Assert.assertEquals(4, rows.get(3).getField(0));
+ Assert.assertEquals("1,1,a,1000,1.11", rows.get(0).toString());
+ Assert.assertEquals("2,2,b,2000,2.22", rows.get(1).toString());
+ Assert.assertEquals("3,3,c,3000,3.33", rows.get(2).toString());
+ Assert.assertEquals("4,4,d,4000,4.44", rows.get(3).toString());
+ }
+
+ @Test
+ public void testReadComplexDataType() throws Exception {
+ final String catalogName = "hive";
+ final String dbName = "source_db";
+ final String tblName = "complex_test";
+ hiveShell.execute("create table source_db.complex_test(" +
+ "a array<int>, m map<int,string>, s struct<f1:int,f2:bigint>)");
+ Integer[] array = new Integer[]{1, 2, 3};
+ Map<Integer, String> map = new LinkedHashMap<>();
+ map.put(1, "a");
+ map.put(2, "b");
+ Object[] struct = new Object[]{3, 3L};
+ hiveShell.insertInto(dbName, tblName)
+ .withAllColumns()
+ .addRow(array, map, struct)
+ .commit();
+ TableEnvironment tEnv = HiveTestUtils.createTableEnv();
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ Table src = tEnv.sqlQuery("select * from hive.source_db.complex_test");
+ List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
+ Assert.assertEquals(1, rows.size());
+ assertArrayEquals(array, (Integer[]) rows.get(0).getField(0));
+ assertEquals(map, rows.get(0).getField(1));
+ assertEquals(Row.of(struct[0], struct[1]), rows.get(0).getField(2));
}
/**
@@ -112,11 +136,12 @@ public class HiveTableSourceTest {
*/
@Test
public void testReadPartitionTable() throws Exception {
+ final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "test_table_pt";
hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
"(year STRING, value INT) partitioned by (pt int);");
- hiveShell.insertInto("source_db", "test_table_pt")
+ hiveShell.insertInto(dbName, tblName)
.withColumns("year", "value", "pt")
.addRow("2014", 3, 0)
.addRow("2014", 4, 0)
@@ -124,11 +149,10 @@ public class HiveTableSourceTest {
.addRow("2015", 5, 1)
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
- CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
- HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
- Table src = tEnv.fromTableSource(hiveTableSource);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt");
List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
+
assertEquals(4, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
@@ -136,11 +160,12 @@ public class HiveTableSourceTest {
@Test
public void testPartitionPrunning() throws Exception {
+ final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "test_table_pt_1";
hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
"(year STRING, value INT) partitioned by (pt int);");
- hiveShell.insertInto("source_db", "test_table_pt_1")
+ hiveShell.insertInto(dbName, tblName)
.withColumns("year", "value", "pt")
.addRow("2014", 3, 0)
.addRow("2014", 4, 0)
@@ -148,11 +173,10 @@ public class HiveTableSourceTest {
.addRow("2015", 5, 1)
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
- CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
- tEnv.registerTableSource("src", new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable));
- Table table = tEnv.sqlQuery("select * from src where pt = 0");
- String[] explain = tEnv.explain(table).split("==.*==\n");
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
+ // first check execution plan to ensure partition prunning works
+ String[] explain = tEnv.explain(src).split("==.*==\n");
assertEquals(4, explain.length);
String abstractSyntaxTree = explain[1];
String optimizedLogicalPlan = explain[2];
@@ -160,7 +184,8 @@ public class HiveTableSourceTest {
assertTrue(abstractSyntaxTree.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: false, PartitionNums: 2]"));
assertTrue(optimizedLogicalPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
assertTrue(physicalExecutionPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
- List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) table)).asJava();
+ // second check execute results
+ List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) src)).asJava();
assertEquals(2, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0"}, rowStrings);
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt b/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt
deleted file mode 100644
index bcb199b..0000000
--- a/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt
+++ /dev/null
@@ -1 +0,0 @@
-1,2,3;1:a,2:b;3,c
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-hive/src/test/resources/test/test.txt b/flink-connectors/flink-connector-hive/src/test/resources/test/test.txt
deleted file mode 100644
index 672650e..0000000
--- a/flink-connectors/flink-connector-hive/src/test/resources/test/test.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-1,1,a,1000,1.11
-2,2,a,2000,2.22
-3,3,a,3000,3.33
-4,4,a,4000,4.44
\ No newline at end of file