You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/04 09:01:29 UTC

[flink] branch release-1.9 updated: [FLINK-13190][hive] Add tests to verify partition pruning for HiveTableSource

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 046e19e  [FLINK-13190][hive] Add tests to verify partition pruning for HiveTableSource
046e19e is described below

commit 046e19e176f35c511436236a3483c03ead2af6c6
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Thu Aug 1 11:29:08 2019 +0800

    [FLINK-13190][hive] Add tests to verify partition pruning for HiveTableSource
    
    This closes #9310
---
 .../flink/connectors/hive/HiveTableSource.java     |  9 ++++++
 .../flink/connectors/hive/HiveTableSourceTest.java | 33 ++++++++++++++++++++++
 2 files changed, 42 insertions(+)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index a5a670e..0563029 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -66,6 +66,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 	private List<Map<String, String>> partitionList = new ArrayList<>();
 	private Map<Map<String, String>, HiveTablePartition> partitionSpec2HiveTablePartition = new HashMap<>();
 	private boolean initAllPartitions;
+	private boolean partitionPruned;
 
 	public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
 		this.jobConf = Preconditions.checkNotNull(jobConf);
@@ -74,6 +75,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 		this.hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
 				"Hive version is not defined");
 		initAllPartitions = false;
+		partitionPruned = false;
 	}
 
 	private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable,
@@ -87,6 +89,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 		this.hiveVersion = hiveVersion;
 		this.partitionList = partitionList;
 		this.initAllPartitions = true;
+		partitionPruned = true;
 	}
 
 	@Override
@@ -223,4 +226,10 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
 		throw new FlinkHiveException(
 				new IllegalArgumentException(String.format("Can not convert %s to type %s for partition value", valStr, type)));
 	}
+
+	@Override
+	public String explainSource() {
+		return super.explainSource() + String.format(" TablePath: %s, PartitionPruned: %s, PartitionNums: %d",
+													tablePath.getFullName(), partitionPruned, null == allHivePartitions ? 0 : allHivePartitions.size());
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index e5b1a45..04070e8 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -46,6 +46,7 @@ import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests {@link HiveTableSource}.
@@ -133,4 +134,36 @@ public class HiveTableSourceTest {
 		assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
 	}
 
+	@Test
+	public void testPartitionPrunning() throws Exception {
+		final String dbName = "source_db";
+		final String tblName = "test_table_pt_1";
+		hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
+						"(year STRING, value INT) partitioned by (pt int);");
+		hiveShell.insertInto("source_db", "test_table_pt_1")
+				.withColumns("year", "value", "pt")
+				.addRow("2014", 3, 0)
+				.addRow("2014", 4, 0)
+				.addRow("2015", 2, 1)
+				.addRow("2015", 5, 1)
+				.commit();
+		TableEnvironment tEnv = HiveTestUtils.createTableEnv();
+		ObjectPath tablePath = new ObjectPath(dbName, tblName);
+		CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+		tEnv.registerTableSource("src", new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable));
+		Table table = tEnv.sqlQuery("select * from src where pt = 0");
+		String[] explain = tEnv.explain(table).split("==.*==\n");
+		assertEquals(4, explain.length);
+		String abstractSyntaxTree = explain[1];
+		String optimizedLogicalPlan = explain[2];
+		String physicalExecutionPlan = explain[3];
+		assertTrue(abstractSyntaxTree.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: false, PartitionNums: 2]"));
+		assertTrue(optimizedLogicalPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
+		assertTrue(physicalExecutionPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
+		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) table)).asJava();
+		assertEquals(2, rows.size());
+		Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
+		assertArrayEquals(new String[]{"2014,3,0", "2014,4,0"}, rowStrings);
+	}
+
 }