You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bi...@apache.org on 2018/11/06 18:26:21 UTC

[1/2] impala git commit: IMPALA-7565: Add startup flag to set thrift connection setup thread pool size

Repository: impala
Updated Branches:
  refs/heads/master 941038229 -> cf89c7348


IMPALA-7565: Add startup flag to set thrift connection setup thread
pool size

This patch adds a hidden experimental startup flag called
'accepted_cnxn_setup_thread_pool_size' which enables setting the size
of the thread pool used to process the internal post-accept, pre-setup
connection queue in each thrift server set up to service
Impala internal and external connections. The default is set to 1
which ensures that this change does not affect current behavior.

Testing:
Tested manually by adding a sleep and making sure other threads are
used.
Ran exhaustive tests with a pool size set to 10 successfully.

Change-Id: I31344321a5f9e840a399ccb0f963c0759e2ab234
Reviewed-on: http://gerrit.cloudera.org:8080/11873
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ca516ecb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ca516ecb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ca516ecb

Branch: refs/heads/master
Commit: ca516ecb6423c3433403227799ae38badc387b81
Parents: 9410382
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Fri Nov 2 15:36:17 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Nov 6 03:22:56 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/TAcceptQueueServer.cpp | 24 +++++++++++++++++-------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ca516ecb/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 0f89a7e..730901b 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -26,8 +26,14 @@
 #include "util/thread-pool.h"
 
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
-    "(Advanced) The size of the post-accept, pre-setup connection queue for Impala "
-    "internal connections");
+    "(Advanced) The size of the post-accept, pre-setup connection queue in each thrift "
+    "server set up to service Impala internal and external connections.");
+
+DEFINE_int32_hidden(accepted_cnxn_setup_thread_pool_size, 1,
+    "(Advanced) The size of the thread pool that is used to process the "
+    "post-accept, pre-setup connection queue in each thrift server set up to service "
+    "Impala internal and external connections. Warning: This is untested for values "
+    "greater than 1 which might exhibit unpredictable behavior and/or cause crashes.");
 
 namespace apache {
 namespace thrift {
@@ -209,13 +215,17 @@ void TAcceptQueueServer::serve() {
     eventHandler_->preServe();
   }
 
-  // Only using one thread here is sufficient for performance, and it avoids potential
-  // thread safety issues with the thrift code called in SetupConnection.
-  constexpr int CONNECTION_SETUP_POOL_SIZE = 1;
-
+  if (FLAGS_accepted_cnxn_setup_thread_pool_size > 1) {
+    LOG(WARNING) << "connection_setup_thread_pool_size is set to "
+                 << FLAGS_accepted_cnxn_setup_thread_pool_size
+                 << ". Values greater than 1 are untested and might exhibit "
+                    "unpredictable behavior and/or cause crashes.";
+  }
   // New - this is the thread pool used to process the internal accept queue.
+  // TODO: IMPALA-7565: Make sure the related thrift code is thread safe and subsequently
+  // enable multi-threading by default.
   ThreadPool<shared_ptr<TTransport>> connection_setup_pool("setup-server", "setup-worker",
-      CONNECTION_SETUP_POOL_SIZE, FLAGS_accepted_cnxn_queue_depth,
+      FLAGS_accepted_cnxn_setup_thread_pool_size, FLAGS_accepted_cnxn_queue_depth,
       [this](int tid, const shared_ptr<TTransport>& item) {
         this->SetupConnection(item);
       });


[2/2] impala git commit: IMPALA-6323 Allow constant analytic window expressions.

Posted by bi...@apache.org.
IMPALA-6323 Allow constant analytic window expressions.

The constraint imposed by IMPALA-1354 was artificial.
If there are constant "partition by" expressions, simply drop them,
they are no-ops.

Constant "order by" expressions can be ignored as well, though in effect
they should be accounted for as null expressions in the backend, with the
effect that combine all rows in the same window (i.e. no window breaks).

Change-Id: Idf129026c45120e9470df601268863634037908c
Reviewed-on: http://gerrit.cloudera.org:8080/11556
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cf89c734
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cf89c734
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cf89c734

Branch: refs/heads/master
Commit: cf89c73485c119d5068be1cdddf1acbd3c8acfd5
Parents: ca516ec
Author: Michal Ostrowski <mo...@cloudera.com>
Authored: Fri Sep 28 11:39:11 2018 -0700
Committer: Michael Ho <kw...@cloudera.com>
Committed: Tue Nov 6 07:54:47 2018 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               |  3 +-
 .../apache/impala/analysis/AnalyticExpr.java    | 12 ++------
 .../apache/impala/planner/AnalyticPlanner.java  | 23 ++++++++++++----
 .../impala/analysis/AnalyzeExprsTest.java       | 15 ++++------
 .../queries/PlannerTest/analytic-fns.test       | 29 ++++++++++++++++++++
 .../queries/QueryTest/analytic-fns.test         | 21 ++++++++++++++
 6 files changed, 77 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cf89c734/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 036cf93..c7bcbda 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -394,7 +394,8 @@ inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(
            << " idx=" << stream_idx;
   if (fn_scope_ != ROWS && (next_partition
         || (fn_scope_ == RANGE && window_.__isset.window_end
-            && !PrevRowCompare(order_by_eq_expr_eval_, child_tuple_cmp_row)))) {
+            && !(order_by_eq_expr_eval_ == nullptr ||
+                 PrevRowCompare(order_by_eq_expr_eval_, child_tuple_cmp_row))))) {
     RETURN_IF_ERROR(AddResultTuple(stream_idx - 1));
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/cf89c734/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
index d033ee1..76fc81f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
@@ -424,22 +424,14 @@ public class AnalyticExpr extends Expr {
     type_ = getFnCall().getType();
 
     for (Expr e: partitionExprs_) {
-      if (e.isConstant()) {
-        throw new AnalysisException(
-            "Expressions in the PARTITION BY clause must not be constant: "
-              + e.toSql() + " (in " + toSql() + ")");
-      } else if (e.getType().isComplexType()) {
+      if (e.getType().isComplexType()) {
         throw new AnalysisException(String.format("PARTITION BY expression '%s' with " +
             "complex type '%s' is not supported.", e.toSql(),
             e.getType().toSql()));
       }
     }
     for (OrderByElement e: orderByElements_) {
-      if (e.getExpr().isConstant()) {
-        throw new AnalysisException(
-            "Expressions in the ORDER BY clause must not be constant: "
-              + e.getExpr().toSql() + " (in " + toSql() + ")");
-      } else if (e.getExpr().getType().isComplexType()) {
+      if (e.getExpr().getType().isComplexType()) {
         throw new AnalysisException(String.format("ORDER BY expression '%s' with " +
             "complex type '%s' is not supported.", e.getExpr().toSql(),
             e.getExpr().getType().toSql()));

http://git-wip-us.apache.org/repos/asf/impala/blob/cf89c734/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 90d98fd..3685bf4 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -82,6 +82,18 @@ public class AnalyticPlanner {
   }
 
   /**
+   * Return true if and only if exprs is non-empty and contains non-constant
+   * expressions.
+   */
+  private boolean activeExprs(List<Expr> exprs) {
+    if (exprs.isEmpty())  return false;
+    for (Expr p: exprs) {
+      if (!p.isConstant()) { return true; }
+    }
+    return false;
+  }
+
+  /**
    * Return plan tree that augments 'root' with plan nodes that implement single-node
    * evaluation of the AnalyticExprs in analyticInfo.
    * This plan takes into account a possible hash partition of its input on
@@ -224,7 +236,7 @@ public class AnalyticPlanner {
     // remove the non-partitioning group from partitionGroups
     PartitionGroup nonPartitioning = null;
     for (PartitionGroup pg: partitionGroups) {
-      if (pg.partitionByExprs.isEmpty()) {
+      if (!activeExprs(pg.partitionByExprs)) {
         nonPartitioning = pg;
         break;
       }
@@ -308,9 +320,10 @@ public class AnalyticPlanner {
     TupleDescriptor bufferedTupleDesc = null;
     // map from input to buffered tuple
     ExprSubstitutionMap bufferedSmap = new ExprSubstitutionMap();
+    boolean activePartition = activeExprs(partitionByExprs);
 
     // sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates
-    if (!partitionByExprs.isEmpty() || !orderByElements.isEmpty()) {
+    if (activePartition || !orderByElements.isEmpty()) {
       // first sort on partitionExprs (direction doesn't matter)
       List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
       List<Boolean> isAsc =
@@ -337,12 +350,12 @@ public class AnalyticPlanner {
 
       // if this sort group does not have partitioning exprs, we want the sort
       // to be executed like a regular distributed sort
-      if (!partitionByExprs.isEmpty()) sortNode.setIsAnalyticSort(true);
+      if (activePartition) sortNode.setIsAnalyticSort(true);
 
       if (partitionExprs != null) {
         // create required input partition
         DataPartition inputPartition = DataPartition.UNPARTITIONED;
-        if (!partitionExprs.isEmpty()) {
+        if (activePartition) {
           inputPartition = DataPartition.hashPartitioned(partitionExprs);
         }
         sortNode.setInputPartition(inputPartition);
@@ -380,7 +393,7 @@ public class AnalyticPlanner {
       // we need to remap the pb/ob exprs to a) the sort output, b) our buffer of the
       // sort input
       Expr partitionByEq = null;
-      if (!windowGroup.partitionByExprs.isEmpty()) {
+      if (activeExprs(windowGroup.partitionByExprs)) {
         partitionByEq = createNullMatchingEquals(
             Expr.substituteList(windowGroup.partitionByExprs, sortSmap, analyzer_, false),
             sortTupleId, bufferedSmap);

http://git-wip-us.apache.org/repos/asf/impala/blob/cf89c734/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index 1908c21..b547526 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -1005,16 +1005,11 @@ public class AnalyzeExprsTest extends AnalyzerTest {
         "(first_value(cast(int_col AS DECIMAL)) " +
         " over (order by int_col rows between 2 preceding and 1 preceding)) " +
         "from functional.alltypestiny");
-    // IMPALA-1354: Constant expressions in order by and partition by exprs
-    AnalysisError(
-        "select rank() over (order by 1) from functional.alltypestiny",
-        "Expressions in the ORDER BY clause must not be constant: 1");
-    AnalysisError(
-        "select rank() over (partition by 2 order by id) from functional.alltypestiny",
-        "Expressions in the PARTITION BY clause must not be constant: 2");
-    AnalysisError(
-        "select rank() over (partition by 2 order by 1) from functional.alltypestiny",
-        "Expressions in the PARTITION BY clause must not be constant: 2");
+    // IMPALA-6323: Allow constant expressions in analytic window exprs.
+    AnalyzesOk("select rank() over (order by 1) from functional.alltypestiny");
+    AnalyzesOk("select count() over (partition by 2) from functional.alltypestiny");
+    AnalyzesOk(
+        "select rank() over (partition by 2 order by 1) from functional.alltypestiny");
 
     // nested analytic exprs
     AnalysisError(

http://git-wip-us.apache.org/repos/asf/impala/blob/cf89c734/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index f78bfb2..5084d2a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -2457,3 +2457,32 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
 ====
+# IMPALA-6323 Partition by a constant is equivalent to no partitioning.
+select x, count() over(partition by 1) from (VALUES((1 x), (2), (3))) T;
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:ANALYTIC
+|  functions: count()
+|  partition by: 1
+|
+00:UNION
+   constant-operands=3
+====
+# IMPALA-6323 Order by a constant is equivalent to no ordering.
+select x, count() over(order by 1) from (VALUES((1 x), (2), (3))) T;
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+02:ANALYTIC
+|  functions: count()
+|  order by: 1 ASC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|
+01:SORT
+|  order by: 1 ASC
+|
+00:UNION
+   constant-operands=3
+====
+

http://git-wip-us.apache.org/repos/asf/impala/blob/cf89c734/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 0c6207c..fbbd939 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -2119,3 +2119,24 @@ true,6,2
 true,4,2
 true,2,2
 true,NULL,2
+====
+---- QUERY
+# IMPALA-6323 Partition by a constant is equivalent to no partitioning.
+select x, count() over(partition by 1) from (VALUES((1 x), (2), (3))) T;
+---- TYPES
+TINYINT, BIGINT
+---- RESULTS
+1,3
+2,3
+3,3
+====
+---- QUERY
+# IMPALA-6323 Order by a constant is equivalent to no ordering.
+select x, count() over(order by 1) from (VALUES((1 x), (2), (3))) T;
+---- TYPES
+TINYINT, BIGINT
+---- RESULTS
+1,3
+2,3
+3,3
+====