You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/05/25 22:59:54 UTC

[1/2] incubator-impala git commit: Bumped Kudu version to 795c435

Repository: incubator-impala
Updated Branches:
  refs/heads/master 4471eb3b9 -> 014c5603f


Bumped Kudu version to 795c435

This is needed for IMPALA-5167.

Change-Id: I925f5d746dcb497e17ab6f85217da9fcee1d4c00
Reviewed-on: http://gerrit.cloudera.org:8080/6975
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bbca0879
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bbca0879
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bbca0879

Branch: refs/heads/master
Commit: bbca087939975647c2b33355ac37c5642927bf4a
Parents: 4471eb3
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed May 24 07:18:33 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 25 20:59:38 2017 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bbca0879/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 11c34f7..cffac90 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -72,7 +72,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=390-07ebc4fbe0
+export IMPALA_TOOLCHAIN_BUILD_ID=394-e5d901b362
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -120,7 +120,7 @@ if [[ $OSTYPE == "darwin"* ]]; then
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=7533364
+export IMPALA_KUDU_VERSION=795c435
 
 # Kudu version used to identify Java client jar from maven
 export KUDU_JAVA_VERSION=1.4.0-cdh5.12.0-SNAPSHOT


[2/2] incubator-impala git commit: IMPALA-5354: INSERT hints for Kudu tables

Posted by he...@apache.org.
IMPALA-5354: INSERT hints for Kudu tables

A previous change, IMPALA-3742, added an exchange node and
sort node to plans for inserts into Kudu tables to partition
and sort the input to match the target table.

This patch enables INSERT hints for Kudu tables - 'noshuffle'
which removes the exchange node from the plan and
'noclustered' which removes the sort node.

Insert hints have no effect for inserts that are small enough
to result in a single node execution.

Testing:
- Updated FE planner and analysis tests.
- Ran Kudu EE tests.

Change-Id: Idbd1ef977446ffee157ce3ce0b476e1f08a75d05
Reviewed-on: http://gerrit.cloudera.org:8080/6980
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/014c5603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/014c5603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/014c5603

Branch: refs/heads/master
Commit: 014c5603f867907963f3821948f90d526e9a4789
Parents: bbca087
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed May 24 13:38:33 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 25 21:08:59 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/InsertStmt.java  |  7 +---
 .../impala/planner/DistributedPlanner.java      | 14 ++-----
 .../java/org/apache/impala/planner/Planner.java | 12 ++----
 .../java/org/apache/impala/util/KuduUtil.java   | 19 ++++++++++
 .../impala/analysis/AnalyzeStmtsTest.java       | 12 +++---
 .../impala/analysis/AnalyzeUpsertStmtTest.java  |  6 +--
 .../queries/PlannerTest/kudu-upsert.test        | 40 ++++++++++++++++++++
 .../queries/PlannerTest/kudu.test               | 29 ++++++++++++++
 8 files changed, 105 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index c25f484..c23145d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -797,12 +797,9 @@ public class InsertStmt extends StatementBase {
 
   private void analyzePlanHints(Analyzer analyzer) throws AnalysisException {
     if (planHints_.isEmpty()) return;
-    if (isUpsert_) {
-      throw new AnalysisException("Hints not supported in UPSERT statements.");
-    }
-    if (table_ instanceof HBaseTable || table_ instanceof KuduTable) {
+    if (table_ instanceof HBaseTable) {
       throw new AnalysisException(String.format("INSERT hints are only supported for " +
-          "inserting into Hdfs tables: %s", getTargetTableName()));
+          "inserting into Hdfs and Kudu tables: %s", getTargetTableName()));
     }
     for (PlanHint hint: planHints_) {
       if (hint.is("SHUFFLE")) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index ac0355e..21423c5 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -23,17 +23,16 @@ import java.util.List;
 import org.apache.impala.analysis.AggregateInfo;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
-import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
-import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
+import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -246,15 +245,8 @@ public class DistributedPlanner {
     if (partitionExprs.isEmpty()) {
       partition = DataPartition.UNPARTITIONED;
     } else if (insertStmt.getTargetTable() instanceof KuduTable) {
-      // IMPALA-5294: Don't use 'partitionExpr' here because the constants were removed.
-      // We need all of the partition exprs to fill in the partition column values for
-      // each row to send to Kudu to determine the partition number.
-      Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
-          (KuduTable) insertStmt.getTargetTable(),
-          Lists.newArrayList(insertStmt.getPartitionKeyExprs()),
-          insertStmt.getPartitionColPos());
-      kuduPartitionExpr.analyze(ctx_.getRootAnalyzer());
-      partition = DataPartition.kuduPartitioned(kuduPartitionExpr);
+      partition = DataPartition.kuduPartitioned(
+          KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
     } else {
       partition = DataPartition.hashPartitioned(partitionExprs);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index b6fa54a..31a061b 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -24,12 +24,10 @@ import java.util.List;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ColumnLineageGraph;
-import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
-import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.catalog.HBaseTable;
@@ -46,6 +44,7 @@ import org.apache.impala.thrift.TQueryExecRequest;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MaxRowsProcessedVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -499,12 +498,9 @@ public class Planner {
     List<Expr> orderingExprs = Lists.newArrayList();
 
     if (insertStmt.getTargetTable() instanceof KuduTable) {
-      if (inputFragment.getDataPartition().getType() == TPartitionType.KUDU) {
-        Preconditions.checkState(
-            inputFragment.getDataPartition().getPartitionExprs().size() == 1);
-        // Only sort for Kudu if we've already partitioned so that we can sort the
-        // partitions separately. This will be true if this is a distributed exec.
-        orderingExprs.add(inputFragment.getDataPartition().getPartitionExprs().get(0));
+      if (!insertStmt.hasNoClusteredHint() && !ctx_.isSingleNodeExec()) {
+        orderingExprs.add(
+            KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
         orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
       }
     } else if (insertStmt.hasClusteredHint() || !insertStmt.getSortExprs().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 8b61c88..645866b 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -23,10 +23,14 @@ import java.util.HashSet;
 import java.util.List;
 
 import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.InsertStmt;
+import org.apache.impala.analysis.KuduPartitionExpr;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -399,4 +403,19 @@ public class KuduUtil {
             "Kudu type '%s' is not supported in Impala", t.getName()));
     }
   }
+
+  /**
+   * Creates and returns an Expr that takes rows being inserted by 'insertStmt' and
+   * returns the partition number for each row.
+   */
+  public static Expr createPartitionExpr(InsertStmt insertStmt, Analyzer analyzer)
+      throws AnalysisException {
+    Preconditions.checkState(insertStmt.getTargetTable() instanceof KuduTable);
+    Expr kuduPartitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
+        (KuduTable) insertStmt.getTargetTable(),
+        Lists.newArrayList(insertStmt.getPartitionKeyExprs()),
+        insertStmt.getPartitionColPos());
+    kuduPartitionExpr.analyze(analyzer);
+    return kuduPartitionExpr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 06ad842..cff7563 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1782,18 +1782,16 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       AnalyzesOk(String.format(
           "insert into table functional.alltypesnopart %sshuffle%s " +
           "select * from functional.alltypesnopart", prefix, suffix));
+      // Insert hints are ok for Kudu tables.
+      AnalyzesOk(String.format(
+          "insert into table functional_kudu.alltypes %sshuffle%s " +
+          "select * from functional_kudu.alltypes", prefix, suffix));
       // Plan hints do not make sense for inserting into HBase tables.
       AnalysisError(String.format(
           "insert into table functional_hbase.alltypes %sshuffle%s " +
           "select * from functional_hbase.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs tables: " +
+          "INSERT hints are only supported for inserting into Hdfs and Kudu tables: " +
           "functional_hbase.alltypes");
-      // Plan hints do not make sense for inserting into Kudu tables.
-      AnalysisError(String.format(
-          "insert into table functional_kudu.alltypes %sshuffle%s " +
-          "select * from functional_kudu.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs tables: " +
-          "functional_kudu.alltypes");
       // Conflicting plan hints.
       AnalysisError("insert into table functional.alltypessmall " +
           "partition (year, month) /* +shuffle,noshuffle */ " +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
index 2a20c2a..d6cd804 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeUpsertStmtTest.java
@@ -60,9 +60,9 @@ public class AnalyzeUpsertStmtTest extends AnalyzerTest {
         "from functional.alltypes a, functional.allcomplextypes b, " +
         "(select item from b.int_array_col) v1 " +
         "where a.id = b.id");
-    // Hint, not supported for Kudu tables.
-    AnalysisError("upsert into table functional_kudu.testtbl [clustered] select * from " +
-        "functional_kudu.testtbl", "Hints not supported in UPSERT statements.");
+    // Hint
+    AnalyzesOk("upsert into table functional_kudu.testtbl [clustered] select * from " +
+        "functional_kudu.testtbl");
 
     // Key columns missing from permutation
     AnalysisError("upsert into functional_kudu.testtbl(zip) values(1)",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
index e04278d..2bc5df7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -190,3 +190,43 @@ UPSERT INTO KUDU [functional_kudu.alltypes]
 |
 00:SCAN KUDU [functional_kudu.alltypes]
 ====
+# Hint - noshuffle should remove the exchange node.
+upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:SORT
+|  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Hint - noclustered should remove the sort node.
+upsert into functional_kudu.alltypes /* +noclustered */ select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:EXCHANGE [KUDU(KuduPartition(functional.alltypes.id))]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Hint - noshuffle should remove the exchange node.
+upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:SORT
+|  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+upsert into functional_kudu.alltypes /* +noclustered,noshuffle */
+select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+UPSERT INTO KUDU [functional_kudu.alltypes]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/014c5603/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 12ab9fc..16cb3a9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -389,3 +389,32 @@ PLAN-ROOT SINK
 |
 01:SCAN KUDU [functional_kudu.alltypes]
 ====
+# Hint - noshuffle should remove the exchange node.
+insert into functional_kudu.alltypes /* +noshuffle */ select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+INSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:SORT
+|  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Hint - noclustered should remove the sort node.
+insert into functional_kudu.alltypes /* +noclustered */ select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+INSERT INTO KUDU [functional_kudu.alltypes]
+|
+01:EXCHANGE [KUDU(KuduPartition(functional.alltypes.id))]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+insert into functional_kudu.alltypes /* +noclustered,noshuffle */
+select * from functional.alltypes;
+---- DISTRIBUTEDPLAN
+INSERT INTO KUDU [functional_kudu.alltypes]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====