You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/19 19:04:32 UTC

[flink] branch release-1.9 updated: [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 9dec1aa  [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
9dec1aa is described below

commit 9dec1aae6600d55aeff38f07a29e4b35b853cae1
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Tue Jul 16 21:01:13 2019 +0800

    [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
    
    Refactor HiveTableSourceTest to use HiveRunner.
    
    This closes #9130.
---
 flink-connectors/flink-connector-hive/pom.xml      |   8 --
 .../batch/connectors/hive/HiveTableSourceTest.java | 100 +++++++++++++--------
 2 files changed, 65 insertions(+), 43 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index e88ada2..de2fa39 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -389,10 +389,6 @@ under the License.
                     <artifactId>hive-jdbc</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.hive.hcatalog</groupId>
-                    <artifactId>hive-webhcat-java-client</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>org.apache.hive</groupId>
                     <artifactId>hive-service</artifactId>
                 </exclusion>
@@ -402,10 +398,6 @@ under the License.
                 </exclusion>
 				<exclusion>
 					<groupId>org.apache.tez</groupId>
-					<artifactId>tez-dag</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.tez</groupId>
 					<artifactId>tez-common</artifactId>
 				</exclusion>
 				<exclusion>
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index d6cf124..e127639 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -30,39 +30,40 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.types.Row;
 
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests {@link HiveTableSource}.
  */
+@RunWith(FlinkStandaloneHiveRunner.class)
 public class HiveTableSourceTest {
-	public static final String DEFAULT_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
-	public static final String DEFAULT_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName();
-	public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
+
+	@HiveSQL(files = {})
+	private static HiveShell hiveShell;
 
 	private static HiveCatalog hiveCatalog;
 	private static HiveConf hiveConf;
 
 	@BeforeClass
 	public static void createCatalog() throws IOException {
-		hiveConf = HiveTestUtils.createHiveConf();
+		hiveConf = hiveShell.getHiveConf();
 		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
 		hiveCatalog.open();
 	}
@@ -74,10 +75,24 @@ public class HiveTableSourceTest {
 		}
 	}
 
+	@Before
+	public void setupSourceDatabaseAndData() {
+		hiveShell.execute("CREATE DATABASE IF NOT EXISTS source_db");
+	}
+
 	@Test
 	public void testReadNonPartitionedTable() throws Exception {
-		final String dbName = "default";
+		final String dbName = "source_db";
 		final String tblName = "test";
+		hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
+		hiveShell.insertInto(dbName, tblName)
+				.withAllColumns()
+				.addRow(1, 1, "a", 1000L, 1.11)
+				.addRow(2, 2, "b", 2000L, 2.22)
+				.addRow(3, 3, "c", 3000L, 3.33)
+				.addRow(4, 4, "d", 4000L, 4.44)
+				.commit();
+
 		TableSchema tableSchema = new TableSchema(
 				new String[]{"a", "b", "c", "d", "e"},
 				new TypeInformation[]{
@@ -87,29 +102,6 @@ public class HiveTableSourceTest {
 						BasicTypeInfo.LONG_TYPE_INFO,
 						BasicTypeInfo.DOUBLE_TYPE_INFO}
 		);
-		//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
-		//serDe temporarily.
-		HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
-		org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
-		tbl.setDbName(dbName);
-		tbl.setTableName(tblName);
-		tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
-		tbl.setParameters(new HashMap<>());
-		StorageDescriptor sd = new StorageDescriptor();
-		String location = HiveTableSourceTest.class.getResource("/test").getPath();
-		sd.setLocation(location);
-		sd.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS);
-		sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
-		sd.setSerdeInfo(new SerDeInfo());
-		sd.getSerdeInfo().setSerializationLib(DEFAULT_SERDE_CLASS);
-		sd.getSerdeInfo().setParameters(new HashMap<>());
-		sd.getSerdeInfo().getParameters().put("serialization.format", "1");
-		sd.getSerdeInfo().getParameters().put("field.delim", ",");
-		sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
-		tbl.setSd(sd);
-		tbl.setPartitionKeys(new ArrayList<>());
-
-		client.createTable(tbl);
 		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
 		BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
 		ObjectPath tablePath = new ObjectPath(dbName, tblName);
@@ -125,4 +117,42 @@ public class HiveTableSourceTest {
 		Assert.assertEquals(3, rows.get(2).getField(0));
 		Assert.assertEquals(4, rows.get(3).getField(0));
 	}
+
+	/**
+	 * Test to read from partition table.
+	 * @throws Exception
+	 */
+	@Test
+	public void testReadPartitionTable() throws Exception {
+		final String dbName = "source_db";
+		final String tblName = "test_table_pt";
+		hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
+						"(year STRING, value INT) partitioned by (pt int);");
+		hiveShell.insertInto("source_db", "test_table_pt")
+				.withColumns("year", "value", "pt")
+				.addRow("2014", 3, 0)
+				.addRow("2014", 4, 0)
+				.addRow("2015", 2, 1)
+				.addRow("2015", 5, 1)
+				.commit();
+		TableSchema tableSchema = new TableSchema(
+				new String[]{"year", "value", "int"},
+				new TypeInformation[]{
+						BasicTypeInfo.STRING_TYPE_INFO,
+						BasicTypeInfo.INT_TYPE_INFO,
+						BasicTypeInfo.INT_TYPE_INFO}
+		);
+		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+		BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
+		ObjectPath tablePath = new ObjectPath(dbName, tblName);
+		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+		HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
+		Table src = tEnv.fromTableSource(hiveTableSource);
+		DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
+		List<Row> rows = rowDataSet.collect();
+		assertEquals(4, rows.size());
+		Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
+		assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
+	}
+
 }