You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/24 02:48:09 UTC

[4/6] incubator-impala git commit: IMPALA-5602: Fix query optimization for kudu and datasource tables

IMPALA-5602: Fix query optimization for kudu and datasource tables

Fix a bug where the following queries on kudu and datasource tables
were incorrectly being optimized as a 'small query' and therefore
running on a single node with a single scanner thread:

(A) that have all their predicates pushed to the underlying storage
layer and have a limit
(B) table stats missing + Conditions in (A)

Testing:
Added frontend planner tests.

Change-Id: I93822d67ebda41d5d0456095c429e3915a3f40c4
Reviewed-on: http://gerrit.cloudera.org:8080/7560
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6f20df81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6f20df81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6f20df81

Branch: refs/heads/master
Commit: 6f20df81f727f89dfb1efef9a86f39cf5a4ef88a
Parents: 81c3d88
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Tue Aug 1 16:34:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 24 02:32:13 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/KuduTable.java    | 44 ++++++++++-------
 .../impala/planner/DataSourceScanNode.java      |  3 ++
 .../apache/impala/planner/HBaseScanNode.java    |  3 ++
 .../org/apache/impala/planner/KuduScanNode.java |  3 ++
 .../org/apache/impala/planner/ScanNode.java     | 16 +++++-
 .../impala/util/MaxRowsProcessedVisitor.java    |  5 +-
 .../apache/impala/common/FrontendTestBase.java  | 52 +++++++++++++-------
 .../org/apache/impala/planner/PlannerTest.java  |  3 ++
 .../queries/PlannerTest/data-source-tables.test | 15 ++++++
 .../queries/PlannerTest/kudu.test               | 27 ++++++++++
 10 files changed, 131 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index cb94503..7e13ac5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -182,6 +182,30 @@ public class KuduTable extends Table {
   }
 
   /**
+   * Load schema and partitioning schemes directly from Kudu.
+   */
+  public void loadSchemaFromKudu() throws ImpalaRuntimeException {
+    // This is set to 0 for Kudu tables.
+    // TODO: Change this to reflect the number of pk columns and modify all the
+    // places (e.g. insert stmt) that currently make use of this parameter.
+    numClusteringCols_ = 0;
+    org.apache.kudu.client.KuduTable kuduTable = null;
+    // Connect to Kudu to retrieve table metadata
+    KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
+      kuduTable = kuduClient.openTable(kuduTableName_);
+    } catch (KuduException e) {
+      throw new ImpalaRuntimeException(
+          String.format("Error opening Kudu table '%s', Kudu error: %s", kuduTableName_,
+              e.getMessage()));
+    }
+    Preconditions.checkNotNull(kuduTable);
+
+    loadSchema(kuduTable);
+    loadPartitionByParams(kuduTable);
+  }
+
+  /**
    * Loads the metadata of a Kudu table.
    *
    * Schema and partitioning schemes are loaded directly from Kudu whereas column stats
@@ -192,32 +216,14 @@ public class KuduTable extends Table {
   public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     msTable_ = msTbl;
-    // This is set to 0 for Kudu tables.
-    // TODO: Change this to reflect the number of pk columns and modify all the
-    // places (e.g. insert stmt) that currently make use of this parameter.
-    numClusteringCols_ = 0;
     kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
     Preconditions.checkNotNull(kuduTableName_);
     kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
     Preconditions.checkNotNull(kuduMasters_);
-    org.apache.kudu.client.KuduTable kuduTable = null;
     setTableStats(msTable_);
-
-    // Connect to Kudu to retrieve table metadata
-    KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
-    try {
-      kuduTable = kuduClient.openTable(kuduTableName_);
-    } catch (KuduException e) {
-      throw new TableLoadingException(String.format(
-          "Error opening Kudu table '%s', Kudu error: %s",
-          kuduTableName_, e.getMessage()));
-    }
-    Preconditions.checkNotNull(kuduTable);
-
     // Load metadata from Kudu and HMS
     try {
-      loadSchema(kuduTable);
-      loadPartitionByParams(kuduTable);
+      loadSchemaFromKudu();
       loadAllColumnStats(msClient);
     } catch (ImpalaRuntimeException e) {
       throw new TableLoadingException("Error loading metadata for Kudu table " +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index cea9b53..e6679da 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -362,4 +362,7 @@ public class DataSourceScanNode extends ScanNode {
     }
     return output.toString();
   }
+
+  @Override
+  public boolean hasStorageLayerConjuncts() { return !acceptedConjuncts_.isEmpty(); }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index d56aa98..d2e47ad 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -508,4 +508,7 @@ public class HBaseScanNode extends ScanNode {
     // TODO: What's a good estimate of memory consumption?
     return 1024L * 1024L * 1024L;
   }
+
+  @Override
+  public boolean hasStorageLayerConjuncts() { return !filters_.isEmpty(); }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 37a4e5c..cbc132b 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -521,4 +521,7 @@ public class KuduScanNode extends ScanNode {
       default: return null;
     }
   }
+
+  @Override
+  public boolean hasStorageLayerConjuncts() { return !kuduConjuncts_.isEmpty(); }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/planner/ScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 1373e89..d6b7813 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -193,7 +193,9 @@ abstract public class ScanNode extends PlanNode {
 
   @Override
   public long getInputCardinality() {
-    if (getConjuncts().isEmpty() && hasLimit()) return getLimit();
+    if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) {
+      return getLimit();
+    }
     return inputCardinality_;
   }
 
@@ -210,4 +212,16 @@ abstract public class ScanNode extends PlanNode {
       return desc_.getPath().toString();
     }
   }
+
+  /**
+   * Returns true if this node has conjuncts to be evaluated by Impala against the scan
+   * tuple.
+   */
+  public boolean hasScanConjuncts() { return !getConjuncts().isEmpty(); }
+
+  /**
+   * Returns true if this node has conjuncts to be evaluated by the underlying storage
+   * engine.
+   */
+  public boolean hasStorageLayerConjuncts() { return false; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
index 56cf047..e338ecb 100644
--- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
+++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
@@ -52,8 +52,9 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> {
       boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats();
       // In the absence of collection stats, treat scans on collections as if they
       // have no limit.
-      if (scan.isAccessingCollectionType()
-          || (missingStats && !(scan.hasLimit() && scan.getConjuncts().isEmpty()))) {
+      if (scan.isAccessingCollectionType() ||
+          (missingStats && !(scan.hasLimit() && !scan.hasScanConjuncts() &&
+              !scan.hasStorageLayerConjuncts()))) {
         valid_ = false;
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 033b0e2..aa96490 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -44,6 +44,7 @@ import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ImpaladCatalog;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Table;
@@ -165,9 +166,9 @@ public class FrontendTestBase {
   }
 
   /**
-   * Add a new dummy table to the catalog based on the given CREATE TABLE sql.
-   * The dummy table only has the column definitions and the metastore table set, but no
-   * other metadata.
+   * Add a new dummy table to the catalog based on the given CREATE TABLE sql. The
+   * returned table only has its metadata partially set, but is capable of being planned.
+   * Only HDFS tables and external Kudu tables are supported.
    * Returns the new dummy table.
    * The test tables are registered in testTables_ and removed in the @After method.
    */
@@ -177,21 +178,36 @@ public class FrontendTestBase {
     Preconditions.checkNotNull(db, "Test tables must be created in an existing db.");
     org.apache.hadoop.hive.metastore.api.Table msTbl =
         CatalogOpExecutor.createMetaStoreTable(createTableStmt.toThrift());
-    HdfsTable dummyTable = new HdfsTable(msTbl, db,
-        createTableStmt.getTbl(), createTableStmt.getOwner());
-    List<ColumnDef> columnDefs = Lists.newArrayList(
-        createTableStmt.getPartitionColumnDefs());
-    dummyTable.setNumClusteringCols(columnDefs.size());
-    columnDefs.addAll(createTableStmt.getColumnDefs());
-    for (int i = 0; i < columnDefs.size(); ++i) {
-      ColumnDef colDef = columnDefs.get(i);
-      dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i));
-    }
-    try {
-    dummyTable.addDefaultPartition(msTbl.getSd());
-    } catch (CatalogException e) {
-      e.printStackTrace();
-      fail("Failed to add test table:\n" + createTableSql);
+    Table dummyTable = Table.fromMetastoreTable(db, msTbl);
+    if (dummyTable instanceof HdfsTable) {
+      List<ColumnDef> columnDefs = Lists.newArrayList(
+          createTableStmt.getPartitionColumnDefs());
+      dummyTable.setNumClusteringCols(columnDefs.size());
+      columnDefs.addAll(createTableStmt.getColumnDefs());
+      for (int i = 0; i < columnDefs.size(); ++i) {
+        ColumnDef colDef = columnDefs.get(i);
+        dummyTable.addColumn(new Column(colDef.getColName(), colDef.getType(), i));
+      }
+      try {
+        HdfsTable hdfsTable = (HdfsTable) dummyTable;
+        hdfsTable.addDefaultPartition(msTbl.getSd());
+      } catch (CatalogException e) {
+        e.printStackTrace();
+        fail("Failed to add test table:\n" + createTableSql);
+      }
+    } else if (dummyTable instanceof KuduTable) {
+      if (!Table.isExternalTable(msTbl)) {
+        fail("Failed to add table, external kudu table expected:\n" + createTableSql);
+      }
+      try {
+        KuduTable kuduTable = (KuduTable) dummyTable;
+        kuduTable.loadSchemaFromKudu();
+      } catch (ImpalaRuntimeException e) {
+        e.printStackTrace();
+        fail("Failed to add test table:\n" + createTableSql);
+      }
+    } else {
+      fail("Test table type not supported:\n" + createTableSql);
     }
     db.addTable(dummyTable);
     testTables_.add(dummyTable);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 3bb8083..fc8ceab 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -314,6 +314,9 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testKudu() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
+    addTestDb("kudu_planner_test", "Test DB for Kudu Planner.");
+    addTestTable("CREATE EXTERNAL TABLE kudu_planner_test.no_stats STORED AS KUDU " +
+        "TBLPROPERTIES ('kudu.table_name' = 'impala::functional_kudu.alltypes');");
     runPlannerTestFile("kudu");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
index 7cc5c04..ce4dbd7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
@@ -96,3 +96,18 @@ PLAN-ROOT SINK
 |
 00:EMPTYSET
 ====
+---- QUERY
+# IMPALA-5602: If a query contains predicates that are all pushed to the datasource and
+# there is a limit, then the query should not incorrectly run with 'small query'
+# optimization.
+select * from functional.alltypes_datasource where id = 1 limit 15
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|  limit: 15
+|
+00:SCAN DATA SOURCE [functional.alltypes_datasource]
+data source predicates: id = 1
+   limit: 15
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6f20df81/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 079d291..e620ad6 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -424,3 +424,30 @@ INSERT INTO KUDU [functional_kudu.alltypes]
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
+# IMPALA-5602: If a query contains predicates that are all pushed to kudu and there is a
+# limit, then the query should not incorrectly run with 'small query' optimization.
+select * from functional_kudu.alltypesagg where tinyint_col = 9 limit 10;
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|  limit: 10
+|
+00:SCAN KUDU [functional_kudu.alltypesagg_idx]
+   kudu predicates: functional_kudu.alltypesagg_idx.tinyint_col = 9
+   limit: 10
+====
+# IMPALA-5602: If a query contains predicates that are all pushed to kudu, there is a
+# limit, and no table stats, then the query should not incorrectly run with 'small query'
+# optimization.
+select * from kudu_planner_test.no_stats where tinyint_col = 9 limit 10;
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|  limit: 10
+|
+00:SCAN KUDU [kudu_planner_test.no_stats]
+   kudu predicates: tinyint_col = 9
+   limit: 10
+====