You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/19 18:58:46 UTC
[flink] branch master updated: [FLINK-13274]Refactor
HiveTableSourceTest using HiveRunner
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7b2e6e7 [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
7b2e6e7 is described below
commit 7b2e6e70bfccf2ffd051d61782d50d3ab9ef0443
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Tue Jul 16 21:01:13 2019 +0800
[FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
Refactor HiveTableSourceTest to use HiveRunner.
This closes #9130.
---
flink-connectors/flink-connector-hive/pom.xml | 8 --
.../batch/connectors/hive/HiveTableSourceTest.java | 100 +++++++++++++--------
2 files changed, 65 insertions(+), 43 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index f9a037b..02e15c9 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -389,10 +389,6 @@ under the License.
<artifactId>hive-jdbc</artifactId>
</exclusion>
<exclusion>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-webhcat-java-client</artifactId>
- </exclusion>
- <exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
</exclusion>
@@ -402,10 +398,6 @@ under the License.
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
- <artifactId>tez-dag</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.tez</groupId>
<artifactId>tez-common</artifactId>
</exclusion>
<exclusion>
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index d6cf124..e127639 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -30,39 +30,40 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
-import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
/**
* Tests {@link HiveTableSource}.
*/
+@RunWith(FlinkStandaloneHiveRunner.class)
public class HiveTableSourceTest {
- public static final String DEFAULT_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
- public static final String DEFAULT_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName();
- public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
+
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
private static HiveCatalog hiveCatalog;
private static HiveConf hiveConf;
@BeforeClass
public static void createCatalog() throws IOException {
- hiveConf = HiveTestUtils.createHiveConf();
+ hiveConf = hiveShell.getHiveConf();
hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
hiveCatalog.open();
}
@@ -74,10 +75,24 @@ public class HiveTableSourceTest {
}
}
+ @Before
+ public void setupSourceDatabaseAndData() {
+ hiveShell.execute("CREATE DATABASE IF NOT EXISTS source_db");
+ }
+
@Test
public void testReadNonPartitionedTable() throws Exception {
- final String dbName = "default";
+ final String dbName = "source_db";
final String tblName = "test";
+ hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
+ hiveShell.insertInto(dbName, tblName)
+ .withAllColumns()
+ .addRow(1, 1, "a", 1000L, 1.11)
+ .addRow(2, 2, "b", 2000L, 2.22)
+ .addRow(3, 3, "c", 3000L, 3.33)
+ .addRow(4, 4, "d", 4000L, 4.44)
+ .commit();
+
TableSchema tableSchema = new TableSchema(
new String[]{"a", "b", "c", "d", "e"},
new TypeInformation[]{
@@ -87,29 +102,6 @@ public class HiveTableSourceTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO}
);
- //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
- //serDe temporarily.
- HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
- org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
- tbl.setDbName(dbName);
- tbl.setTableName(tblName);
- tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
- tbl.setParameters(new HashMap<>());
- StorageDescriptor sd = new StorageDescriptor();
- String location = HiveTableSourceTest.class.getResource("/test").getPath();
- sd.setLocation(location);
- sd.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS);
- sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setSerializationLib(DEFAULT_SERDE_CLASS);
- sd.getSerdeInfo().setParameters(new HashMap<>());
- sd.getSerdeInfo().getParameters().put("serialization.format", "1");
- sd.getSerdeInfo().getParameters().put("field.delim", ",");
- sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
- tbl.setSd(sd);
- tbl.setPartitionKeys(new ArrayList<>());
-
- client.createTable(tbl);
ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
@@ -125,4 +117,42 @@ public class HiveTableSourceTest {
Assert.assertEquals(3, rows.get(2).getField(0));
Assert.assertEquals(4, rows.get(3).getField(0));
}
+
+ /**
+ * Test to read from partition table.
+ * @throws Exception
+ */
+ @Test
+ public void testReadPartitionTable() throws Exception {
+ final String dbName = "source_db";
+ final String tblName = "test_table_pt";
+ hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
+ "(year STRING, value INT) partitioned by (pt int);");
+ hiveShell.insertInto("source_db", "test_table_pt")
+ .withColumns("year", "value", "pt")
+ .addRow("2014", 3, 0)
+ .addRow("2014", 4, 0)
+ .addRow("2015", 2, 1)
+ .addRow("2015", 5, 1)
+ .commit();
+ TableSchema tableSchema = new TableSchema(
+ new String[]{"year", "value", "int"},
+ new TypeInformation[]{
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO}
+ );
+ ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+ BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+ HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
+ Table src = tEnv.fromTableSource(hiveTableSource);
+ DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
+ List<Row> rows = rowDataSet.collect();
+ assertEquals(4, rows.size());
+ Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
+ assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
+ }
+
}