You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/04 09:01:29 UTC
[flink] branch release-1.9 updated: [FLINK-13190][hive] Add tests
to verify partition pruning for HiveTableSource
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 046e19e [FLINK-13190][hive] Add tests to verify partition pruning for HiveTableSource
046e19e is described below
commit 046e19e176f35c511436236a3483c03ead2af6c6
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Thu Aug 1 11:29:08 2019 +0800
[FLINK-13190][hive] Add tests to verify partition pruning for HiveTableSource
This closes #9310
---
.../flink/connectors/hive/HiveTableSource.java | 9 ++++++
.../flink/connectors/hive/HiveTableSourceTest.java | 33 ++++++++++++++++++++++
2 files changed, 42 insertions(+)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index a5a670e..0563029 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -66,6 +66,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
private List<Map<String, String>> partitionList = new ArrayList<>();
private Map<Map<String, String>, HiveTablePartition> partitionSpec2HiveTablePartition = new HashMap<>();
private boolean initAllPartitions;
+ private boolean partitionPruned;
public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable) {
this.jobConf = Preconditions.checkNotNull(jobConf);
@@ -74,6 +75,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
this.hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
"Hive version is not defined");
initAllPartitions = false;
+ partitionPruned = false;
}
private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catalogTable,
@@ -87,6 +89,7 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
this.hiveVersion = hiveVersion;
this.partitionList = partitionList;
this.initAllPartitions = true;
+ partitionPruned = true;
}
@Override
@@ -223,4 +226,10 @@ public class HiveTableSource extends InputFormatTableSource<Row> implements Part
throw new FlinkHiveException(
new IllegalArgumentException(String.format("Can not convert %s to type %s for partition value", valStr, type)));
}
+
+ @Override
+ public String explainSource() {
+ return super.explainSource() + String.format(" TablePath: %s, PartitionPruned: %s, PartitionNums: %d",
+ tablePath.getFullName(), partitionPruned, null == allHivePartitions ? 0 : allHivePartitions.size());
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index e5b1a45..04070e8 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -46,6 +46,7 @@ import scala.collection.JavaConverters;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests {@link HiveTableSource}.
@@ -133,4 +134,36 @@ public class HiveTableSourceTest {
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
}
+ @Test
+ public void testPartitionPrunning() throws Exception {
+ final String dbName = "source_db";
+ final String tblName = "test_table_pt_1";
+ hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
+ "(year STRING, value INT) partitioned by (pt int);");
+ hiveShell.insertInto("source_db", "test_table_pt_1")
+ .withColumns("year", "value", "pt")
+ .addRow("2014", 3, 0)
+ .addRow("2014", 4, 0)
+ .addRow("2015", 2, 1)
+ .addRow("2015", 5, 1)
+ .commit();
+ TableEnvironment tEnv = HiveTestUtils.createTableEnv();
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+ tEnv.registerTableSource("src", new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable));
+ Table table = tEnv.sqlQuery("select * from src where pt = 0");
+ String[] explain = tEnv.explain(table).split("==.*==\n");
+ assertEquals(4, explain.length);
+ String abstractSyntaxTree = explain[1];
+ String optimizedLogicalPlan = explain[2];
+ String physicalExecutionPlan = explain[3];
+ assertTrue(abstractSyntaxTree.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: false, PartitionNums: 2]"));
+ assertTrue(optimizedLogicalPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
+ assertTrue(physicalExecutionPlan.contains("HiveTableSource(year, value, pt) TablePath: source_db.test_table_pt_1, PartitionPruned: true, PartitionNums: 1]"));
+ List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect((TableImpl) table)).asJava();
+ assertEquals(2, rows.size());
+ Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
+ assertArrayEquals(new String[]{"2014,3,0", "2014,4,0"}, rowStrings);
+ }
+
}