You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/01/11 00:02:20 UTC

[impala] branch master updated (7b490ee -> 6894085)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 7b490ee  IMPALA-10951 (preparation): Update Kudu to a more recent version
     new 65c3b78  IMPALA-11033: Add support for specifying multiple executor group sets
     new 6894085  IMPALA-11030: Fix incorrect creation of common partition exprs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/exec-env.cc                         |  10 +-
 be/src/scheduling/CMakeLists.txt                   |   2 +-
 be/src/scheduling/cluster-membership-mgr-test.cc   | 219 +++++++++++++++++++++
 be/src/scheduling/cluster-membership-mgr.cc        | 145 +++++++++++---
 be/src/scheduling/cluster-membership-mgr.h         |  23 +++
 be/src/service/impala-hs2-server.cc                |   3 +-
 be/src/service/impala-server.cc                    |  16 +-
 common/thrift/Frontend.thrift                      |  23 ++-
 .../org/apache/impala/analysis/AnalyticInfo.java   |   6 +-
 .../impala/util/ExecutorMembershipSnapshot.java    |  35 ++--
 .../org/apache/impala/planner/ClusterSizeTest.java |  24 ++-
 .../org/apache/impala/planner/PlannerTestBase.java |   7 +-
 .../queries/PlannerTest/analytic-fns.test          |  39 ++++
 .../queries/QueryTest/analytic-fns.test            |  24 +++
 tests/custom_cluster/test_executor_groups.py       |   6 +-
 15 files changed, 516 insertions(+), 66 deletions(-)

[impala] 01/02: IMPALA-11033: Add support for specifying multiple executor group sets

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 65c3b784a27e6b74cc2fb9a9e47dcd05c55fe752
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Wed Dec 8 17:46:23 2021 -0800

    IMPALA-11033: Add support for specifying multiple executor group sets
    
    This patch introduces the concept of executor group sets. Each group
    set specifies an executor group name prefix and an expected group
    size (the number of executors in each group). Every executor group
    that is a part of this set will have the same prefix which will
    also be equivalent to the resource pool name that it maps to.
    These sets are specified via a startup flag
    'expected_executor_group_sets' which is a comma separated list in
    the format <executor_group_name_prefix>:<expected_group_size>.
    
    Testing:
    - Added unit tests
    
    Change-Id: I9e0a3a5fe2b1f0b7507b7c096b7a3c373bc2e684
    Reviewed-on: http://gerrit.cloudera.org:8080/18093
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                         |  10 +-
 be/src/scheduling/CMakeLists.txt                   |   2 +-
 be/src/scheduling/cluster-membership-mgr-test.cc   | 219 +++++++++++++++++++++
 be/src/scheduling/cluster-membership-mgr.cc        | 145 +++++++++++---
 be/src/scheduling/cluster-membership-mgr.h         |  23 +++
 be/src/service/impala-hs2-server.cc                |   3 +-
 be/src/service/impala-server.cc                    |  16 +-
 common/thrift/Frontend.thrift                      |  23 ++-
 .../impala/util/ExecutorMembershipSnapshot.java    |  35 ++--
 .../org/apache/impala/planner/ClusterSizeTest.java |  24 ++-
 .../org/apache/impala/planner/PlannerTestBase.java |   7 +-
 tests/custom_cluster/test_executor_groups.py       |   6 +-
 12 files changed, 448 insertions(+), 65 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index e5fd99e..8a78d35 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -174,11 +174,11 @@ using namespace impala;
 /// Helper method to forward cluster membership updates to the frontend.
 /// For additional details see comments for PopulateExecutorMembershipRequest()
 /// in cluster-membership-mgr.cc
-void SendClusterMembershipToFrontend(
-    ClusterMembershipMgr::SnapshotPtr& snapshot, Frontend* frontend) {
+void SendClusterMembershipToFrontend(ClusterMembershipMgr::SnapshotPtr& snapshot,
+    const vector<TExecutorGroupSet>& expected_exec_group_sets, Frontend* frontend) {
   TUpdateExecutorMembershipRequest update_req;
 
-  PopulateExecutorMembershipRequest(snapshot, update_req);
+  PopulateExecutorMembershipRequest(snapshot, expected_exec_group_sets, update_req);
 
   Status status = frontend->UpdateExecutorMembership(update_req);
   if (!status.ok()) {
@@ -435,7 +435,9 @@ Status ExecEnv::Init() {
   if (FLAGS_is_coordinator && frontend_ != nullptr) {
     cluster_membership_mgr_->RegisterUpdateCallbackFn(
         [this](ClusterMembershipMgr::SnapshotPtr snapshot) {
-          SendClusterMembershipToFrontend(snapshot, this->frontend());
+          SendClusterMembershipToFrontend(snapshot,
+              this->cluster_membership_mgr()->GetExpectedExecGroupSets(),
+              this->frontend());
         });
   }
 
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 61bd37c..f5ce02d 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -57,7 +57,7 @@ add_library(SchedulingTests STATIC
 add_dependencies(SchedulingTests gen-deps)
 
 ADD_UNIFIED_BE_LSAN_TEST(admission-controller-test AdmissionControllerTest.*)
-ADD_UNIFIED_BE_LSAN_TEST(cluster-membership-mgr-test ClusterMembershipMgrTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(cluster-membership-mgr-test "ClusterMembershipMgrTest.*:ClusterMembershipMgrUnitTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(executor-group-test ExecutorGroupTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(hash-ring-test HashRingTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(scheduler-test SchedulerTest.*)
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
index a3c62fb..1e54b42 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -36,6 +36,8 @@ using namespace impala::test;
 
 DECLARE_int32(statestore_max_missed_heartbeats);
 DECLARE_int32(statestore_heartbeat_frequency_ms);
+DECLARE_int32(num_expected_executors);
+DECLARE_string(expected_executor_group_sets);
 
 namespace impala {
 
@@ -549,6 +551,223 @@ TEST_F(ClusterMembershipMgrTest, RandomizedMembershipUpdates) {
       << num_deleted << ", killed: " << num_killed << endl;
 }
 
+/// This tests various valid and invalid cases that the parsing logic in
+/// PopulateExpectedExecGroupSets can encounter.
+TEST(ClusterMembershipMgrUnitTest, TestPopulateExpectedExecGroupSets) {
+  gflags::FlagSaver saver;
+  vector<TExecutorGroupSet> expected_exec_group_sets;
+  // Case 1: Empty string
+  FLAGS_expected_executor_group_sets = "";
+  expected_exec_group_sets.clear();
+  Status status =
+      ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_TRUE(status.ok());
+  EXPECT_TRUE(expected_exec_group_sets.empty());
+
+  // Case 2: Single valid group set
+  FLAGS_expected_executor_group_sets = "group-prefix1:2";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_TRUE(status.ok());
+  EXPECT_EQ(expected_exec_group_sets.size(), 1);
+  EXPECT_EQ(expected_exec_group_sets[0].exec_group_name_prefix, "group-prefix1");
+  EXPECT_EQ(expected_exec_group_sets[0].expected_num_executors, 2);
+
+  // Case 3: Multiple valid group sets
+  FLAGS_expected_executor_group_sets = "group-prefix1:2,group-prefix2:10";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_TRUE(status.ok());
+  EXPECT_EQ(expected_exec_group_sets.size(), 2);
+  EXPECT_EQ(expected_exec_group_sets[0].exec_group_name_prefix, "group-prefix1");
+  EXPECT_EQ(expected_exec_group_sets[0].expected_num_executors, 2);
+  EXPECT_EQ(expected_exec_group_sets[1].exec_group_name_prefix, "group-prefix2");
+  EXPECT_EQ(expected_exec_group_sets[1].expected_num_executors, 10);
+
+  // Case 4: Multiple valid group sets but out of order, output is expected to return in
+  // increasing order of expected group size
+  FLAGS_expected_executor_group_sets = "group-prefix1:10,group-prefix2:2";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_TRUE(status.ok());
+  EXPECT_EQ(expected_exec_group_sets.size(), 2);
+  EXPECT_EQ(expected_exec_group_sets[0].exec_group_name_prefix, "group-prefix2");
+  EXPECT_EQ(expected_exec_group_sets[0].expected_num_executors, 2);
+  EXPECT_EQ(expected_exec_group_sets[1].exec_group_name_prefix, "group-prefix1");
+  EXPECT_EQ(expected_exec_group_sets[1].expected_num_executors, 10);
+
+  // Case 5: Invalid input for expected group size
+  FLAGS_expected_executor_group_sets = "group-prefix1:2abc";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.msg().GetFullMessageDetails(),
+      "Failed to parse expected executor group set size for input: group-prefix1:2abc\n");
+
+  // Case 6: Invalid input with no expected group size
+  FLAGS_expected_executor_group_sets = "group-prefix1:";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.msg().GetFullMessageDetails(),
+      "Failed to parse expected executor group set size for input: group-prefix1:\n");
+
+  // Case 7: Invalid input with no group prefix
+  FLAGS_expected_executor_group_sets = ":1";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.msg().GetFullMessageDetails(),
+      "Executor group set prefix cannot be empty for input: :1\n");
+
+  // Case 8: Invalid input with no colon separator
+  FLAGS_expected_executor_group_sets = "group-prefix1";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.msg().GetFullMessageDetails(),
+      "Invalid executor group set format: group-prefix1\n");
+
+  // Case 9: Invalid input with duplicated group prefix
+  FLAGS_expected_executor_group_sets = "group-prefix1:2,group-prefix1:10";
+  expected_exec_group_sets.clear();
+  status = ClusterMembershipMgr::PopulateExpectedExecGroupSets(expected_exec_group_sets);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.msg().GetFullMessageDetails(),
+      "Executor group set prefix specified multiple times: group-prefix1:10\n");
+}
+
+/// This ensures that all executor group configuration scenarios possible using available
+/// startup flags are handled correctly when populating membership updates that are
+/// sent to the frontend.
+TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) {
+  gflags::FlagSaver saver;
+  FLAGS_num_expected_executors = 20;
+  auto snapshot_ptr = std::make_shared<ClusterMembershipMgr::Snapshot>();
+  TUpdateExecutorMembershipRequest update_req;
+  vector<TExecutorGroupSet> empty_exec_group_sets;
+  vector<TExecutorGroupSet> populated_exec_group_sets;
+  populated_exec_group_sets.emplace_back();
+  populated_exec_group_sets.back().__set_exec_group_name_prefix("foo");
+  populated_exec_group_sets.back().__set_expected_num_executors(2);
+  populated_exec_group_sets.emplace_back();
+  populated_exec_group_sets.back().__set_exec_group_name_prefix("bar");
+  populated_exec_group_sets.back().__set_expected_num_executors(10);
+
+  // Case 1a: Not using executor groups
+  {
+    ExecutorGroup exec_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1);
+    exec_group.AddExecutor(MakeBackendDescriptor(1, 0));
+    snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
+    ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
+    PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
+    EXPECT_EQ(update_req.exec_group_sets.size(), 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
+    EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
+    snapshot_ptr->executor_groups.clear();
+  }
+
+  // Case 1b: Not using executor groups but expected_exec_group_sets is non-empty
+  {
+    ExecutorGroup exec_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME, 1);
+    exec_group.AddExecutor(MakeBackendDescriptor(1, 0));
+    snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
+    ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
+    PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
+    EXPECT_EQ(update_req.exec_group_sets.size(), 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
+    EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
+    snapshot_ptr->executor_groups.clear();
+  }
+
+  // Case 2a: Using executor groups, expected_exec_group_sets is empty
+  {
+    ExecutorGroup exec_group("foo-group1", 1);
+    exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
+    snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
+    // Adding another exec group with more executors.
+    ExecutorGroup exec_group2("foo-group2", 1);
+    exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
+    exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
+    snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
+    ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
+    PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
+    EXPECT_EQ(update_req.exec_group_sets.size(), 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
+    EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
+    EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
+    snapshot_ptr->executor_groups.clear();
+  }
+
+  // Case 2b: Using executor groups, expected_exec_group_sets is empty, executor groups
+  // with different group prefixes
+  {
+    ExecutorGroup exec_group("foo-group1", 1);
+    exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
+    snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
+    // Adding another exec group with a different group prefix.
+    ExecutorGroup exec_group2("bar-group1", 1);
+    exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
+    exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
+    snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
+    ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
+    PopulateExecutorMembershipRequest(ptr, empty_exec_group_sets, update_req);
+    EXPECT_EQ(update_req.exec_group_sets.size(), 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 2);
+    EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 20);
+    EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "");
+    snapshot_ptr->executor_groups.clear();
+  }
+
+  // Case 2c: Using executor groups, expected_exec_group_sets is non-empty
+  {
+    ExecutorGroup exec_group("foo-group1", 1);
+    exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
+    snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
+    // Adding another exec group with a different group prefix.
+    ExecutorGroup exec_group2("bar-group1", 1);
+    exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
+    exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
+    snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
+    ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
+    PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
+    EXPECT_EQ(update_req.exec_group_sets.size(), 2);
+    EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2);
+    EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo");
+    EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 2);
+    EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10);
+    EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar");
+    snapshot_ptr->executor_groups.clear();
+  }
+
+  // Case 2d: Using executor groups, expected_exec_group_sets is non-empty
+  // and one executor group that does not match to any executor group sets having more
+  // number of executor groups
+  {
+    ExecutorGroup exec_group("foo-group1", 1);
+    exec_group.AddExecutor(MakeBackendDescriptor(1, exec_group, 0));
+    snapshot_ptr->executor_groups.insert({exec_group.name(), exec_group});
+    // Adding another exec group with a different group prefix.
+    ExecutorGroup exec_group2("unmatch-group1", 1);
+    exec_group2.AddExecutor(MakeBackendDescriptor(1, exec_group2, 1));
+    exec_group2.AddExecutor(MakeBackendDescriptor(2, exec_group2, 2));
+    snapshot_ptr->executor_groups.insert({exec_group2.name(), exec_group2});
+    ClusterMembershipMgr::SnapshotPtr ptr = snapshot_ptr;
+    PopulateExecutorMembershipRequest(ptr, populated_exec_group_sets, update_req);
+    EXPECT_EQ(update_req.exec_group_sets.size(), 2);
+    EXPECT_EQ(update_req.exec_group_sets[0].curr_num_executors, 1);
+    EXPECT_EQ(update_req.exec_group_sets[0].expected_num_executors, 2);
+    EXPECT_EQ(update_req.exec_group_sets[0].exec_group_name_prefix, "foo");
+    EXPECT_EQ(update_req.exec_group_sets[1].curr_num_executors, 0);
+    EXPECT_EQ(update_req.exec_group_sets[1].expected_num_executors, 10);
+    EXPECT_EQ(update_req.exec_group_sets[1].exec_group_name_prefix, "bar");
+    snapshot_ptr->executor_groups.clear();
+  }
+}
+
 /// TODO: Write a test that makes a number of random changes to cluster membership while
 /// not maintaining the proper lifecycle steps that a backend goes through (create, start,
 /// quiesce, delete).
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index 91bae23..afb9e41 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -17,13 +17,21 @@
 
 #include "scheduling/cluster-membership-mgr.h"
 
+#include <boost/algorithm/string/join.hpp>
+#include <gutil/strings/split.h>
+#include <gutil/strings/stringpiece.h>
+
 #include "common/logging.h"
 #include "common/names.h"
-#include "gen-cpp/Frontend_types.h"
+#include "runtime/exec-env.h"
 #include "service/impala-server.h"
 #include "util/metrics.h"
+#include "util/string-parser.h"
 #include "util/test-info.h"
 
+DECLARE_int32(num_expected_executors);
+DECLARE_string(expected_executor_group_sets);
+
 namespace {
 using namespace impala;
 
@@ -59,6 +67,10 @@ ClusterMembershipMgr::ClusterMembershipMgr(
     current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
     local_backend_id_(move(local_backend_id)) {
+  Status status = PopulateExpectedExecGroupSets(expected_exec_group_sets_);
+  if(!status.ok()) {
+    LOG(FATAL) << "Error populating expected executor group sets: " << status;
+  }
   DCHECK(metrics != nullptr);
   MetricGroup* metric_grp = metrics->GetOrCreateChildGroup("cluster-membership");
   total_live_executor_groups_ = metric_grp->AddCounter(LIVE_EXEC_GROUP_KEY, 0);
@@ -568,48 +580,119 @@ bool ClusterMembershipMgr::IsBackendInExecutorGroups(
   return false;
 }
 
-/// Helper method to populate a thrift request object for cluster membership
-/// using a supplied snapshot from the cluster membership manager.
-///
-/// The frontend uses cluster membership information to determine whether it expects the
-/// scheduler to assign local or remote reads. It also uses the number of executors to
-/// determine the join type (partitioned vs broadcast). For the default executor group, we
-/// assume that local reads are preferred and will include the hostnames and IP addresses
-/// in the update to the frontend. For non-default executor groups, we assume that we will
-/// read data remotely and will only send the number of executors in the largest healthy
-/// group.
+/// For the default executor group, we assume that local reads are preferred and will
+/// include the hostnames and IP addresses in the update to the frontend. For non-default
+/// executor groups, we assume that we will read data remotely and will only send the
+/// number of executors in the largest healthy group. When expected exec group sets are
+/// specified we apply the aforementioned steps for each group set.
 void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot,
+    const vector<TExecutorGroupSet>& expected_exec_group_sets,
     TUpdateExecutorMembershipRequest& update_req) {
-  const ExecutorGroup* group = nullptr;
-  bool is_default_group = false;
+  vector<TExecutorGroupSet> exec_group_sets;
   auto default_it =
       snapshot->executor_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
   if (default_it != snapshot->executor_groups.end()) {
-    is_default_group = true;
-    group = &(default_it->second);
+    exec_group_sets.emplace_back();
+    exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors);
+    const ExecutorGroup* group = &(default_it->second);
+    for (const auto& backend : group->GetAllExecutorDescriptors()) {
+      if (backend.is_executor()) {
+        update_req.hostnames.insert(backend.address().hostname());
+        update_req.ip_addresses.insert(backend.ip_address());
+        exec_group_sets.back().curr_num_executors++;
+      }
+    }
   } else {
-    int max_num_executors = 0;
-    // Find largest healthy group.
-    for (const auto& it : snapshot->executor_groups) {
-      if (!it.second.IsHealthy()) continue;
-      int num_executors = it.second.NumExecutors();
-      if (num_executors > max_num_executors) {
-        max_num_executors = num_executors;
-        group = &(it.second);
+    if (expected_exec_group_sets.empty()) {
+      // Add a default exec group set if no expected group sets were specified.
+      exec_group_sets.emplace_back();
+      exec_group_sets.back().__set_expected_num_executors(FLAGS_num_expected_executors);
+    } else {
+      exec_group_sets.insert(exec_group_sets.begin(), expected_exec_group_sets.begin(),
+          expected_exec_group_sets.end());
+    }
+    int matching_exec_groups_found = 0;
+    for (auto& set : exec_group_sets) {
+      int max_num_executors = -1;
+      StringPiece prefix(set.exec_group_name_prefix);
+      DCHECK(!prefix.empty() || exec_group_sets.size() == 1)
+          << "An empty group set prefix can only exist if no executor group sets are "
+             "specified";
+      for (const auto& it : snapshot->executor_groups) {
+        StringPiece name(it.first);
+        if (!prefix.empty() && !name.starts_with(prefix)) continue;
+        matching_exec_groups_found++;
+        if (!it.second.IsHealthy()) continue;
+        int num_executors = it.second.NumExecutors();
+        if (num_executors > max_num_executors) {
+          max_num_executors = num_executors;
+          set.curr_num_executors = num_executors;
+        }
+      }
+    }
+    if (matching_exec_groups_found != snapshot->executor_groups.size()) {
+      vector<string> group_sets;
+      for (const auto& set : exec_group_sets) {
+        group_sets.push_back(set.exec_group_name_prefix);
+      }
+      vector<string> group_names;
+      for (const auto& it : snapshot->executor_groups) {
+        group_names.push_back(it.first);
       }
+      LOG(WARNING) << "Some executor groups either do not match expected group sets or "
+                   "match to more than one set. Expected group sets: "
+                << boost::algorithm::join(group_sets, ",") << " Current executor groups: "
+                << boost::algorithm::join(group_names, ",");
     }
   }
-  if (group) {
-    for (const auto& backend : group->GetAllExecutorDescriptors()) {
-      if (backend.is_executor()) {
-        if (is_default_group) {
-          update_req.hostnames.insert(backend.address().hostname());
-          update_req.ip_addresses.insert(backend.ip_address());
-        }
-        update_req.num_executors++;
+  update_req.__set_exec_group_sets(exec_group_sets);
+}
+
+Status ClusterMembershipMgr::PopulateExpectedExecGroupSets(
+    std::vector<TExecutorGroupSet>& expected_exec_group_sets) {
+  expected_exec_group_sets.clear();
+  std::unordered_set<string> parsed_group_prefixes;
+  vector<StringPiece> groups;
+  groups = strings::Split(FLAGS_expected_executor_group_sets, ",", strings::SkipEmpty());
+  if (groups.empty()) return Status::OK();
+
+  // Name and expected group size are separated by ':'.
+  for (const StringPiece& group : groups) {
+    int colon_idx = group.find_first_of(':');
+    string group_prefix = group.substr(0, colon_idx).as_string();
+    if (group_prefix.empty()) {
+      return Status(Substitute(
+          "Executor group set prefix cannot be empty for input: $0", group.ToString()));
+    }
+    if (parsed_group_prefixes.find(group_prefix) != parsed_group_prefixes.end()) {
+      return Status(Substitute(
+          "Executor group set prefix specified multiple times: $0", group.ToString()));
+    }
+    if (colon_idx != StringPiece::npos) {
+      StringParser::ParseResult result;
+      int64_t expected_num_executors = StringParser::StringToInt<int64_t>(
+          group.data() + colon_idx + 1, group.length() - colon_idx - 1, &result);
+      if (result != StringParser::PARSE_SUCCESS) {
+        return Status(
+            Substitute("Failed to parse expected executor group set size for input: $0",
+                group.ToString()));
       }
+      expected_exec_group_sets.emplace_back();
+      expected_exec_group_sets.back().__set_exec_group_name_prefix(group_prefix);
+      expected_exec_group_sets.back().__set_expected_num_executors(
+          expected_num_executors);
+      parsed_group_prefixes.insert(group_prefix);
+    } else {
+      return Status(
+          Substitute("Invalid executor group set format: $0", group.ToString()));
     }
   }
+  // sort by increasing order expected group size.
+  sort(expected_exec_group_sets.begin(), expected_exec_group_sets.end(),
+      [](const TExecutorGroupSet& first, const TExecutorGroupSet& second) {
+        return first.expected_num_executors < second.expected_num_executors;
+      });
+  return Status::OK();
 }
 
 } // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index e7e218b..12bfae4 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "gen-cpp/Frontend_types.h"
 #include "scheduling/executor-blacklist.h"
 #include "scheduling/executor-group.h"
 #include "statestore/statestore-subscriber.h"
@@ -170,6 +171,10 @@ class ClusterMembershipMgr {
   /// queries.
   const ExecutorGroup* GetEmptyExecutorGroup() { return &empty_exec_group_; }
 
+  const std::vector<TExecutorGroupSet>& GetExpectedExecGroupSets() {
+    return expected_exec_group_sets_;
+  }
+
  private:
   /// Serializes and adds the local backend descriptor to 'subscriber_topic_updates'.
   void AddLocalBackendToStatestore(const BackendDescriptorPB& local_be_desc,
@@ -210,9 +215,19 @@ class ClusterMembershipMgr {
   bool IsBackendInExecutorGroups(
       const BackendDescriptorPB& be_desc, const ExecutorGroups& executor_groups);
 
+  /// Parses the --expected_executor_group_sets startup flag and populates
+  /// 'expected_exec_group_sets' with the group name prefix and expected group size. Also
+  /// sorts it in increasing order by the expected group size.
+  static Status PopulateExpectedExecGroupSets(
+      std::vector<TExecutorGroupSet>& expected_exec_group_sets);
+
   /// An empty group used for scheduling coordinator only queries.
   const ExecutorGroup empty_exec_group_;
 
+  /// Info about expected executor group sets parsed from num_expected_executor_groups
+  /// flag in Init() and sorted by the expected group size in increasing order.
+  std::vector<TExecutorGroupSet> expected_exec_group_sets_;
+
   /// Ensures that only one thread is processing a membership update at a time, either
   /// from a statestore update or a blacklisting decision. Must be taken before any other
   /// locks in this class.
@@ -257,9 +272,17 @@ class ClusterMembershipMgr {
   mutable std::mutex callback_fn_lock_;
 
   friend class impala::test::SchedulerWrapper;
+  friend class ClusterMembershipMgrUnitTest_TestPopulateExpectedExecGroupSets_Test;
 };
 
+/// Helper method to populate a thrift request object 'update_req' for cluster membership
+/// using a supplied 'snapshot' from the cluster membership manager and the
+/// 'expected_exec_group_sets' sorted in increasing order by their expected group size.
+/// The frontend uses cluster membership information to determine whether it expects the
+/// scheduler to assign local or remote reads. It also uses the number of executors to
+/// determine the join type (partitioned vs broadcast).
 void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot,
+    const std::vector<TExecutorGroupSet>& expected_exec_group_sets,
     TUpdateExecutorMembershipRequest& update_req);
 
 } // end namespace impala
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 987dd70..7e87e1c 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -1248,7 +1248,8 @@ void ImpalaServer::GetExecutorMembership(
 
   // Populate an instance of TUpdateExecutorMembershipRequest
   // with the field values retrieved from membership_snapshot
-  PopulateExecutorMembershipRequest(membership_snapshot, return_val.executor_membership);
+  PopulateExecutorMembershipRequest(membership_snapshot,
+      cluster_membership_mgr->GetExpectedExecGroupSets(), return_val.executor_membership);
 
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
   VLOG_RPC << "GetExecutorMembership(): return_val=" << ThriftDebugString(return_val);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 95152f1..e4e66ea 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -286,10 +286,22 @@ DEFINE_string(executor_groups, "",
     "contains at least that number of executors for the group will it be considered "
     "healthy for admission. Currently only a single group may be specified.");
 
-DEFINE_int32(num_expected_executors, 20, "The number of executors that are expected to "
+DEFINE_int32(num_expected_executors, 20,
+    "The number of executors that are expected to "
     "be available for the execution of a single query. This value is used during "
     "planning if no executors have started yet. Once a healthy executor group has "
-    "started, its size is used instead.");
+    "started, its size is used instead. NOTE: This flag is overridden by "
+    "'expected_executor_group_sets' which is a more expressive way of specifying "
+    "multiple executor group sets");
+
+DEFINE_string(expected_executor_group_sets, "",
+    "Only used by the coordinator. List of expected executor group sets, separated by "
+    "comma in the following format: <executor_group_name_prefix>:<expected_group_size> . "
+    "For eg. “prefix1:10”, this set will include executor groups named like "
+    "prefix1-group1, prefix1-group2, etc. The expected group size (number of executors "
+    "in each group) is used during planning when no healthy executor group is available. "
+    "If this flag is used then any executor groups that do not map to the specified group"
+    " sets will never be used to schedule queries.");
 
 // TODO: can we automatically choose a startup grace period based on the max admission
 // control queue timeout + some margin for error?
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index ff7d936..b2d9a4c 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -711,18 +711,35 @@ struct TUpdateCatalogCacheResponse {
   3: required i64 new_catalog_version
 }
 
+// Types of executor groups
+struct TExecutorGroupSet {
+  // The current max number of executors among all healthy groups of this group set.
+  1: i32 curr_num_executors = 0
+
+  // The expected size of the executor groups. Can be used to plan queries when
+  // no healthy executor groups are present(curr_num_executors is 0).
+  2: i32 expected_num_executors = 0
+
+  // The name of the request pool associated with this executor group type. All
+  // executor groups that match this prefix will be included as a part of this set.
+  // Note: this will be empty when 'default' executor group is used or
+  // 'expected_executor_group_sets' startup flag is not specified.
+  3: string exec_group_name_prefix
+}
+
 // Sent from the impalad BE to FE with the latest membership snapshot of the
 // executors on the cluster resulting from the Membership heartbeat.
 struct TUpdateExecutorMembershipRequest {
   // The hostnames of the executor nodes.
+  // Note: There can be multiple executors running on the same host.
   1: required set<string> hostnames
 
   // The ip addresses of the executor nodes.
+  // Note: There can be multiple executors running on the same ip addresses.
   2: required set<string> ip_addresses
 
-  // The number of executors on a cluster, needed since there can be multiple
-  // impalads running on the same host.
-  3: i32 num_executors
+  // Info about existing executor group sets.
+  3: list<TExecutorGroupSet> exec_group_sets
 }
 
 // Contains all interesting statistics from a single 'memory pool' in the JVM.
diff --git a/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java b/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
index e852a14..dcccff9 100644
--- a/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
+++ b/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
@@ -17,12 +17,16 @@
 
 package org.apache.impala.util;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TExecutorGroupSet;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
+import org.apache.kudu.shaded.com.google.common.base.Preconditions;
+
 import com.google.common.collect.Sets;
 
 /**
@@ -53,16 +57,24 @@ public class ExecutorMembershipSnapshot {
   // multiple impalads are running on a single host.
 
   // When using executor groups, this value reflects the number of executors in the
-  // largest healthy group. If all group become unhealthy, the backend will not send a
-  // membership update and this value will retain the last non-zero value. This allows the
-  // planner to work on the assumption that a healthy executor group of the same size will
+  // largest healthy group. If all groups become unhealthy, it will be set to the
+  // expected group size for that executor group set. This allows the planner to
+  // work on the assumption that a healthy executor group of the same size will
   // eventually come online to execute queries.
   private final int numExecutors_;
 
+  // Info about the expected executor group sets sorted by the expected executor
+  // group size. When not using executor groups (using 'default' excutor group) or
+  // when 'expected_executor_group_sets' startup flag is not specified, this will
+  // contain a single entry with an empty group name prefix.
+  // TODO: IMPALA-10992: use this info to plan queries for multiple executor group sets.
+  private final List<TExecutorGroupSet> exec_group_sets_;
+
   // Used only to construct the initial ExecutorMembershipSnapshot.
   private ExecutorMembershipSnapshot() {
     hostnames_ = Sets.newHashSet();
     ipAddresses_ = Sets.newHashSet();
+    exec_group_sets_ = new ArrayList<TExecutorGroupSet>();
     // We use 0 for the number of executors to indicate that no update from the
     // ClusterMembershipManager has arrived yet and we will return the value
     // '-num_expected_executors' in numExecutors().
@@ -73,13 +85,10 @@ public class ExecutorMembershipSnapshot {
   private ExecutorMembershipSnapshot(TUpdateExecutorMembershipRequest request) {
     hostnames_ = request.getHostnames();
     ipAddresses_ = request.getIp_addresses();
-    // If the updates snapshot does not contain any executors we fall back to the previous
-    // value. This can happen if no healthy executor groups are currently online.
-    if (request.getNum_executors() > 0) {
-      numExecutors_ = request.getNum_executors();
-    } else {
-      numExecutors_ = cluster_.get().numExecutors_;
-    }
+    exec_group_sets_ = request.getExec_group_sets();
+    Preconditions.checkState(!exec_group_sets_.isEmpty(), "Atleast one executor group "
+        + "set should have been specified in the membership update.");
+    numExecutors_ = exec_group_sets_.get(0).curr_num_executors;
   }
 
   // Determine whether a host, given either by IP address or hostname, is a member of
@@ -93,8 +102,8 @@ public class ExecutorMembershipSnapshot {
   // been registered so far, this method will return a configurable default to allow the
   // planner to operated based on the expected number of executors.
   public int numExecutors() {
-    if (numExecutors_ == 0) {
-      return BackendConfig.INSTANCE.getBackendCfg().num_expected_executors;
+    if (numExecutors_ == 0 && !exec_group_sets_.isEmpty()) {
+      return exec_group_sets_.get(0).expected_num_executors;
     }
     return numExecutors_;
   }
diff --git a/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java b/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java
index 5f6d3ba..fc53eff 100644
--- a/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/ClusterSizeTest.java
@@ -20,12 +20,16 @@ package org.apache.impala.planner;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TExecutorGroupSet;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+
 import org.junit.Test;
 import com.google.common.collect.Sets;
 
@@ -52,13 +56,18 @@ public class ClusterSizeTest extends FrontendTestBase {
 
   /**
    * Sends an update to the ExecutorMembershipSnapshot containing the specified number of
-   * executors. The host list will only contain localhost.
+   * executors. The host list will only contain localhost. 'numExpectedExecutors' is the
+   * value that corresponds to the -num_expected_executors startup flag.
    */
-  private void setNumExecutors(int num) {
+  private void setNumExecutors(int numExecutor, int numExpectedExecutors) {
     TUpdateExecutorMembershipRequest updateReq = new TUpdateExecutorMembershipRequest();
     updateReq.setIp_addresses(Sets.newHashSet("127.0.0.1"));
     updateReq.setHostnames(Sets.newHashSet("localhost"));
-    updateReq.setNum_executors(num);
+    TExecutorGroupSet group_set = new TExecutorGroupSet();
+    group_set.curr_num_executors = numExecutor;
+    group_set.expected_num_executors = numExpectedExecutors;
+    updateReq.setExec_group_sets(new ArrayList<TExecutorGroupSet>());
+    updateReq.getExec_group_sets().add(group_set);
     ExecutorMembershipSnapshot.update(updateReq);
   }
 
@@ -71,25 +80,28 @@ public class ClusterSizeTest extends FrontendTestBase {
     final String query = "select * from alltypes a inner join alltypes b on a.id = b.id";
     final String broadcast_exchange = ":EXCHANGE [BROADCAST]";
     final String hash_exchange = ":EXCHANGE [HASH(b.id)]";
+    // default value for the -num_expected_executors startup flag.
+    final int default_num_expected_executors = 20;
 
     // By default no executors are registered and the planner falls back to the value of
     // -num_expected_executors, which is 20 by default.
+    setNumExecutors(0, default_num_expected_executors);
     assertTrue(getExplainString(query).contains(hash_exchange));
 
     // Adding a single executor will make the planner switch to a broadcast join.
-    setNumExecutors(1);
+    setNumExecutors(1, default_num_expected_executors);
     assertTrue(getExplainString(query).contains(broadcast_exchange));
 
     // Adding two or more executors will make the planner switch to a partitioned hash
     // join.
     for (int n = 2; n < 5; ++n) {
-      setNumExecutors(n);
+      setNumExecutors(n, default_num_expected_executors);
       assertTrue(getExplainString(query).contains(hash_exchange));
     }
 
     // If the backend reports a single executor, the planner should fall back to a
     // broadcast join.
-    setNumExecutors(1);
+    setNumExecutors(1, default_num_expected_executors);
     assertTrue(getExplainString(query).contains(broadcast_exchange));
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 8472d27..f955480 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -50,6 +50,7 @@ import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.QueryConstants;
 import org.apache.impala.thrift.TDescriptorTable;
 import org.apache.impala.thrift.TExecRequest;
+import org.apache.impala.thrift.TExecutorGroupSet;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.THBaseKeyRange;
 import org.apache.impala.thrift.THdfsFileSplit;
@@ -106,7 +107,11 @@ public class PlannerTestBase extends FrontendTestBase {
     TUpdateExecutorMembershipRequest updateReq = new TUpdateExecutorMembershipRequest();
     updateReq.setIp_addresses(Sets.newHashSet("127.0.0.1"));
     updateReq.setHostnames(Sets.newHashSet("localhost"));
-    updateReq.setNum_executors(3);
+    TExecutorGroupSet group_set = new TExecutorGroupSet();
+    group_set.curr_num_executors = 3;
+    group_set.expected_num_executors = 20; // default num_expected_executors startup flag
+    updateReq.setExec_group_sets(new ArrayList<TExecutorGroupSet>());
+    updateReq.getExec_group_sets().add(group_set);
     ExecutorMembershipSnapshot.update(updateReq);
 
     kuduClient_ = new KuduClient.KuduClientBuilder("127.0.0.1:7051").build();
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index d89cd76..6baed97 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -497,12 +497,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                                    timeout=20)
     assert_broadcast_join()
 
-    # Kill a second executor. The group becomes unhealthy but we cache its last healthy
-    # size and will continue to pick a broadcast join.
+    # Kill a second executor. The group becomes unhealthy and we go back to using the
+    # expected size to plan which would result in a hash join
     self.cluster.impalads[-2].kill()
     self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1,
                                                    timeout=20)
-    assert_broadcast_join()
+    assert_hash_join()
 
   @pytest.mark.execute_serially
   def test_join_strategy_multiple_executors(self):

[impala] 02/02: IMPALA-11030: Fix incorrect creation of common partition exprs

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6894085e4e280d728957e8b9fe175be23bfc6ec0
Author: Aman Sinha <am...@cloudera.com>
AuthorDate: Sun Dec 5 23:51:55 2021 -0800

    IMPALA-11030: Fix incorrect creation of common partition exprs
    
    When there are 2 or more analytic functions in an inline view
    and at least one of them does not have a partition-by expr,
    we were previously still populating the commonPartitionExprs
    list in AnalyticInfo. This common partition expr was then
    used during the auxiliary predicate creation when the outer
    query has a predicate on partition-by column. This leads to
    wrong result because the auxiliary predicate is pushed down
    to the table scan. While pushing down predicate on a
    partitioning column is okay if all the analytic functions
    contain that partitioning column, it is not correct to do
    this when at least one analytic function does not have that
    partitioning column.
    
    This patch fixes the wrong result by ensuring that the
    AnalyticInfo's commonPartitionExprs is empty if at least
    one analytic function does not have partitioning exprs.
    
    Testing:
     - Added new planner test and e2e test for row_num
       analytic function
    
    Change-Id: Iebb51f691e8e5459ffbaf5a49907140f2de212cc
    Reviewed-on: http://gerrit.cloudera.org:8080/18072
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Aman Sinha <am...@cloudera.com>
---
 .../org/apache/impala/analysis/AnalyticInfo.java   |  6 +++-
 .../queries/PlannerTest/analytic-fns.test          | 39 ++++++++++++++++++++++
 .../queries/QueryTest/analytic-fns.test            | 24 +++++++++++++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java
index 0d8e3bc..585cddd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticInfo.java
@@ -113,7 +113,11 @@ public class AnalyticInfo extends AggregateInfoBase {
     for (Expr analyticExpr: analyticExprs_) {
       Preconditions.checkState(analyticExpr.isAnalyzed());
       List<Expr> partitionExprs = ((AnalyticExpr) analyticExpr).getPartitionExprs();
-      if (partitionExprs == null) continue;
+      if (partitionExprs == null || partitionExprs.size() == 0) {
+        // if any of the partition by list is empty, the intersection set is empty
+        result.clear();
+        break;
+      }
       if (result.isEmpty()) {
         result.addAll(partitionExprs);
       } else {
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index a01b401..f15e32e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -3496,3 +3496,42 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=20B cardinality=235.34K
 ====
+# IMPALA-11030: 2 Analytic functions within inline view,
+# only one of which has partitioning expr. Predicate in
+# outer query on partitioning expr
+select * from
+ (select id, string_col, row_number() over (order by id) num_rank,
+  row_number() over (partition by string_col order by id) prime_rank
+ from functional.alltypessmall) v where string_col = '0';
+---- PLAN
+PLAN-ROOT SINK
+|
+05:SELECT
+|  predicates: string_col = '0'
+|  row-size=33B cardinality=10
+|
+04:ANALYTIC
+|  functions: row_number()
+|  order by: id ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=33B cardinality=100
+|
+03:SORT
+|  order by: id ASC
+|  row-size=25B cardinality=100
+|
+02:ANALYTIC
+|  functions: row_number()
+|  partition by: string_col
+|  order by: id ASC
+|  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=25B cardinality=100
+|
+01:SORT
+|  order by: string_col ASC NULLS LAST, id ASC
+|  row-size=17B cardinality=100
+|
+00:SCAN HDFS [functional.alltypessmall]
+   HDFS partitions=4/4 files=4 size=6.32KB
+   row-size=17B cardinality=100
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index fc1dce4..1d2fa53 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -2206,3 +2206,27 @@ BIGINT,BIGINT
 7,8
 8,8
 ====
+---- QUERY
+# IMPALA-11030: 2 Analytic functions within inline view,
+# only one of which has partitioning expr. Predicate in
+# outer query on partitioning expr
+select * from
+ (select id, string_col, row_number() over (order by id) num_rank,
+  row_number() over (partition by string_col order by id) prime_rank
+ from alltypessmall) v where string_col = '0'
+---- TYPES
+INT, STRING, BIGINT, BIGINT
+---- RESULTS
+0,'0',1,1
+10,'0',11,2
+20,'0',21,3
+25,'0',26,4
+35,'0',36,5
+45,'0',46,6
+50,'0',51,7
+60,'0',61,8
+70,'0',71,9
+75,'0',76,10
+85,'0',86,11
+95,'0',96,12
+====