You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/24 02:48:09 UTC
[4/6] incubator-impala git commit: IMPALA-5602: Fix query
optimization for kudu and datasource tables
IMPALA-5602: Fix query optimization for kudu and datasource tables
Fix a bug where the following queries on kudu and datasource tables
were incorrectly being optimized as a 'small query' and therefore
running on a single node with a single scanner thread:
(A) that have all their predicates pushed to the underlying storage
layer and have a limit
(B) table stats missing + Conditions in (A)
Testing:
Added frontend planner tests.
Change-Id: I93822d67ebda41d5d0456095c429e3915a3f40c4
Reviewed-on: http://gerrit.cloudera.org:8080/7560
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6f20df81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6f20df81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6f20df81
Branch: refs/heads/master
Commit: 6f20df81f727f89dfb1efef9a86f39cf5a4ef88a
Parents: 81c3d88
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Tue Aug 1 16:34:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 24 02:32:13 2017 +0000
----------------------------------------------------------------------
.../org/apache/impala/catalog/KuduTable.java | 44 ++++++++++-------
.../impala/planner/DataSourceScanNode.java | 3 ++
.../apache/impala/planner/HBaseScanNode.java | 3 ++
.../org/apache/impala/planner/KuduScanNode.java | 3 ++
.../org/apache/impala/planner/ScanNode.java | 16 +++++-
.../impala/util/MaxRowsProcessedVisitor.java | 5 +-
.../apache/impala/common/FrontendTestBase.java | 52 +++++++++++++-------
.../org/apache/impala/planner/PlannerTest.java | 3 ++
.../queries/PlannerTest/data-source-tables.test | 15 ++++++
.../queries/PlannerTest/kudu.test | 27 ++++++++++
10 files changed, 131 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index cb94503..7e13ac5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -182,6 +182,30 @@ public class KuduTable extends Table {
}
/**
+ * Load schema and partitioning schemes directly from Kudu.
+ */
+ public void loadSchemaFromKudu() throws ImpalaRuntimeException {
+ // This is set to 0 for Kudu tables.
+ // TODO: Change this to reflect the number of pk columns and modify all the
+ // places (e.g. insert stmt) that currently make use of this parameter.
+ numClusteringCols_ = 0;
+ org.apache.kudu.client.KuduTable kuduTable = null;
+ // Connect to Kudu to retrieve table metadata
+ KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
+ try {
+ kuduTable = kuduClient.openTable(kuduTableName_);
+ } catch (KuduException e) {
+ throw new ImpalaRuntimeException(
+ String.format("Error opening Kudu table '%s', Kudu error: %s", kuduTableName_,
+ e.getMessage()));
+ }
+ Preconditions.checkNotNull(kuduTable);
+
+ loadSchema(kuduTable);
+ loadPartitionByParams(kuduTable);
+ }
+
+ /**
* Loads the metadata of a Kudu table.
*
* Schema and partitioning schemes are loaded directly from Kudu whereas column stats
@@ -192,32 +216,14 @@ public class KuduTable extends Table {
public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
msTable_ = msTbl;
- // This is set to 0 for Kudu tables.
- // TODO: Change this to reflect the number of pk columns and modify all the
- // places (e.g. insert stmt) that currently make use of this parameter.
- numClusteringCols_ = 0;
kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
Preconditions.checkNotNull(kuduTableName_);
kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
Preconditions.checkNotNull(kuduMasters_);
- org.apache.kudu.client.KuduTable kuduTable = null;
setTableStats(msTable_);
-
- // Connect to Kudu to retrieve table metadata
- KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
- try {
- kuduTable = kuduClient.openTable(kuduTableName_);
- } catch (KuduException e) {
- throw new TableLoadingException(String.format(
- "Error opening Kudu table '%s', Kudu error: %s",
- kuduTableName_, e.getMessage()));
- }
- Preconditions.checkNotNull(kuduTable);
-
// Load metadata from Kudu and HMS
try {
- loadSchema(kuduTable);
- loadPartitionByParams(kuduTable);
+ loadSchemaFromKudu();
loadAllColumnStats(msClient);
} catch (ImpalaRuntimeException e) {
throw new TableLoadingException("Error loading metadata for Kudu table " +
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index cea9b53..e6679da 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -362,4 +362,7 @@ public class DataSourceScanNode extends ScanNode {
}
return output.toString();
}
+
+ @Override
+ public boolean hasStorageLayerConjuncts() { return !acceptedConjuncts_.isEmpty(); }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index d56aa98..d2e47ad 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -508,4 +508,7 @@ public class HBaseScanNode extends ScanNode {
// TODO: What's a good estimate of memory consumption?
return 1024L * 1024L * 1024L;
}
+
+ @Override
+ public boolean hasStorageLayerConjuncts() { return !filters_.isEmpty(); }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 37a4e5c..cbc132b 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -521,4 +521,7 @@ public class KuduScanNode extends ScanNode {
default: return null;
}
}
+
+ @Override
+ public boolean hasStorageLayerConjuncts() { return !kuduConjuncts_.isEmpty(); }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 1373e89..d6b7813 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -193,7 +193,9 @@ abstract public class ScanNode extends PlanNode {
@Override
public long getInputCardinality() {
- if (getConjuncts().isEmpty() && hasLimit()) return getLimit();
+ if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) {
+ return getLimit();
+ }
return inputCardinality_;
}
@@ -210,4 +212,16 @@ abstract public class ScanNode extends PlanNode {
return desc_.getPath().toString();
}
}
+
+ /**
+ * Returns true if this node has conjuncts to be evaluated by Impala against the scan
+ * tuple.
+ */
+ public boolean hasScanConjuncts() { return !getConjuncts().isEmpty(); }
+
+ /**
+ * Returns true if this node has conjuncts to be evaluated by the underlying storage
+ * engine.
+ */
+ public boolean hasStorageLayerConjuncts() { return false; }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
index 56cf047..e338ecb 100644
--- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
+++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
@@ -52,8 +52,9 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> {
boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats();
// In the absence of collection stats, treat scans on collections as if they
// have no limit.
- if (scan.isAccessingCollectionType()
- || (missingStats && !(scan.hasLimit() && scan.getConjuncts().isEmpty()))) {
+ if (scan.isAccessingCollectionType() ||
+ (missingStats && !(scan.hasLimit() && !scan.hasScanConjuncts() &&
+ !scan.hasStorageLayerConjuncts()))) {
valid_ = false;
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 033b0e2..aa96490 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -44,6 +44,7 @@ import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.ImpaladCatalog;
+import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.ScalarFunction;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Table;
@@ -165,9 +166,9 @@ public class FrontendTestBase {
}
/**
- * Add a new dummy table to the catalog based on the given CREATE TABLE sql.
- * The dummy table only has the column definitions and the metastore table set, but no
- * other metadata.
+ * Add a new dummy table to the catalog based on the given CREATE TABLE sql. The
+ * returned table only has its metadata partially set, but is capable of being planned.
+ * Only HDFS tables and external Kudu tables are supported.
* Returns the new dummy table.
* The test tables are registered in testTables_ and removed in the @After method.
*/
@@ -177,21 +178,36 @@ public class FrontendTestBase {
Preconditions.checkNotNull(db, "Test tables must be created in an existing db.");
org.apache.hadoop.hive.metastore.api.Table msTbl =
CatalogOpExecutor.createMetaStoreTable(createTableStmt.toThrift());
- HdfsTable dummyTable = new HdfsTable(msTbl, db,
- createTableStmt.getTbl(), createTableStmt.getOwner());
- List<ColumnDef> columnDefs = Lists.newArrayList(
- createTableStmt.getPartitionColumnDefs());
- dummyTable.setNumClusteringCols(columnDefs.size());
- columnDefs.addAll(createTableStmt.getColumnDefs());
- for (int i = 0; i < columnDefs.size(); ++i) {
- ColumnDef colDef = columnDefs.get(i);
- dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i));
- }
- try {
- dummyTable.addDefaultPartition(msTbl.getSd());
- } catch (CatalogException e) {
- e.printStackTrace();
- fail("Failed to add test table:\n" + createTableSql);
+ Table dummyTable = Table.fromMetastoreTable(db, msTbl);
+ if (dummyTable instanceof HdfsTable) {
+ List<ColumnDef> columnDefs = Lists.newArrayList(
+ createTableStmt.getPartitionColumnDefs());
+ dummyTable.setNumClusteringCols(columnDefs.size());
+ columnDefs.addAll(createTableStmt.getColumnDefs());
+ for (int i = 0; i < columnDefs.size(); ++i) {
+ ColumnDef colDef = columnDefs.get(i);
+ dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i));
+ }
+ try {
+ HdfsTable hdfsTable = (HdfsTable) dummyTable;
+ hdfsTable.addDefaultPartition(msTbl.getSd());
+ } catch (CatalogException e) {
+ e.printStackTrace();
+ fail("Failed to add test table:\n" + createTableSql);
+ }
+ } else if (dummyTable instanceof KuduTable) {
+ if (!Table.isExternalTable(msTbl)) {
+ fail("Failed to add table, external kudu table expected:\n" + createTableSql);
+ }
+ try {
+ KuduTable kuduTable = (KuduTable) dummyTable;
+ kuduTable.loadSchemaFromKudu();
+ } catch (ImpalaRuntimeException e) {
+ e.printStackTrace();
+ fail("Failed to add test table:\n" + createTableSql);
+ }
+ } else {
+ fail("Test table type not supported:\n" + createTableSql);
}
db.addTable(dummyTable);
testTables_.add(dummyTable);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 3bb8083..fc8ceab 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -314,6 +314,9 @@ public class PlannerTest extends PlannerTestBase {
@Test
public void testKudu() {
Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
+ addTestDb("kudu_planner_test", "Test DB for Kudu Planner.");
+ addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " +
+ "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');");
runPlannerTestFile("kudu");
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
index 7cc5c04..ce4dbd7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
@@ -96,3 +96,18 @@ PLAN-ROOT SINK
|
00:EMPTYSET
====
+---- QUERY
+# IMPALA-5602: If a query contains predicates that are all pushed to the datasource and
+# there is a limit, then the query should not incorrectly run with 'small query'
+# optimization.
+select * from functional.alltypes_datasource where id = 1 limit 15
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+| limit: 15
+|
+00:SCAN DATA SOURCE [functional.alltypes_datasource]
+data source predicates: id = 1
+ limit: 15
+====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 079d291..e620ad6 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -424,3 +424,30 @@ INSERT INTO KUDU [functional_kudu.alltypes]
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
+# IMPALA-5602: If a query contains predicates that are all pushed to kudu and there is a
+# limit, then the query should not incorrectly run with 'small query' optimization.
+select * from functional_kudu.alltypesagg where tinyint_col = 9 limit 10;
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+| limit: 10
+|
+00:SCAN KUDU [functional_kudu.alltypesagg_idx]
+ kudu predicates: functional_kudu.alltypesagg_idx.tinyint_col = 9
+ limit: 10
+====
+# IMPALA-5602: If a query contains predicates that are all pushed to kudu, there is a
+# limit, and no table stats, then the query should not incorrectly run with 'small query'
+# optimization.
+select * from kudu_planner_test.no_stats where tinyint_col = 9 limit 10;
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+| limit: 10
+|
+00:SCAN KUDU [kudu_planner_test.no_stats]
+ kudu predicates: tinyint_col = 9
+ limit: 10
+====