You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2018/07/09 19:50:10 UTC

[3/6] impala git commit: IMPALA-6031: Fix executor node count in distributed plans

IMPALA-6031: Fix executor node count in distributed plans

Prior to this change, the planner also considered coordinator-only
nodes as executors while estimating the number of scan nodes to be
used in the distributed plan. This change ensures that only
executor nodes are considered for that estimation.

Testing:
Added a new custom cluster test to verify the same.

Change-Id: I44af6b40099a495e13a0a5dc72c491d486d23aa8
Reviewed-on: http://gerrit.cloudera.org:8080/10873
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/880011fa
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/880011fa
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/880011fa

Branch: refs/heads/master
Commit: 880011fa1f2fc48f9972ad3c673a4bff838cc5ce
Parents: 2816211
Author: poojanilangekar <po...@cloudera.com>
Authored: Thu Jun 14 09:22:03 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Jul 7 03:33:08 2018 +0000

----------------------------------------------------------------------
 be/src/service/frontend.cc                      |  4 +-
 be/src/service/frontend.h                       |  6 +-
 be/src/service/impala-server.cc                 |  9 ++-
 common/thrift/Frontend.thrift                   | 14 +++-
 .../apache/impala/planner/HBaseScanNode.java    |  9 +--
 .../org/apache/impala/planner/HdfsScanNode.java | 16 ++--
 .../org/apache/impala/service/Frontend.java     |  8 +-
 .../org/apache/impala/service/JniFrontend.java  | 11 +--
 .../impala/util/ExecutorMembershipSnapshot.java | 82 +++++++++++++++++++
 .../apache/impala/util/MembershipSnapshot.java  | 84 --------------------
 .../apache/impala/planner/PlannerTestBase.java  | 10 +--
 tests/custom_cluster/test_coordinators.py       | 36 +++++++++
 12 files changed, 165 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 9ae9f90..ca3f79f 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -81,7 +81,7 @@ Frontend::Frontend() {
     {"getHadoopGroups", "([B)[B", &get_hadoop_groups_id_},
     {"checkConfiguration", "()Ljava/lang/String;", &check_config_id_},
     {"updateCatalogCache", "([B)[B", &update_catalog_cache_id_},
-    {"updateMembership", "([B)V", &update_membership_id_},
+    {"updateExecutorMembership", "([B)V", &update_membership_id_},
     {"getCatalogMetrics", "()[B", &get_catalog_metrics_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
     {"describeDb", "([B)[B", &describe_db_id_},
@@ -126,7 +126,7 @@ Status Frontend::UpdateCatalogCache(const TUpdateCatalogCacheRequest& req,
   return JniUtil::CallJniMethod(fe_, update_catalog_cache_id_, req, resp);
 }
 
-Status Frontend::UpdateMembership(const TUpdateMembershipRequest& req) {
+Status Frontend::UpdateExecutorMembership(const TUpdateExecutorMembershipRequest& req) {
   return JniUtil::CallJniMethod(fe_, update_membership_id_, req);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/be/src/service/frontend.h
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 77836ab..2327e8f 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -45,9 +45,9 @@ class Frontend {
   Status UpdateCatalogCache(const TUpdateCatalogCacheRequest& req,
       TUpdateCatalogCacheResponse *resp);
 
-  /// Request to update the Impalad frontend cluster membership snapshot.  The
-  /// TUpdateMembershipRequest contains the latest set of hosts.
-  Status UpdateMembership(const TUpdateMembershipRequest& req);
+  /// Request to update the Impalad frontend cluster membership snapshot of executors.
+  /// The TUpdateExecutorMembershipRequest contains the latest set of executor nodes.
+  Status UpdateExecutorMembership(const TUpdateExecutorMembershipRequest& req);
 
   /// Call FE to get explain plan
   Status GetExplainPlan(const TQueryCtx& query_ctx, std::string* explain_string);

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c1f00e3..97fa70e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1667,18 +1667,19 @@ void ImpalaServer::MembershipCallback(
     // membership by network address.
     set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes is true.
-    TUpdateMembershipRequest update_req;
+    // Only send the hostname and ip_address of the executors to the frontend.
+    TUpdateExecutorMembershipRequest update_req;
     bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
     for (const BackendDescriptorMap::value_type& backend: known_backends_) {
       current_membership.insert(backend.second.address);
-      if (any_changes) {
+      if (any_changes && backend.second.is_executor) {
         update_req.hostnames.insert(backend.second.address.hostname);
         update_req.ip_addresses.insert(backend.second.ip_address);
+        update_req.num_executors++;
       }
     }
     if (any_changes) {
-      update_req.num_nodes = known_backends_.size();
-      Status status = exec_env_->frontend()->UpdateMembership(update_req);
+      Status status = exec_env_->frontend()->UpdateExecutorMembership(update_req);
       if (!status.ok()) {
         LOG(WARNING) << "Error updating frontend membership snapshot: "
                      << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 6cbbf79..1245c94 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -711,12 +711,18 @@ struct TUpdateCatalogCacheResponse {
   3: required i64 new_catalog_version
 }
 
-// Sent from the impalad BE to FE with the latest cluster membership snapshot resulting
-// from the Membership heartbeat.
-struct TUpdateMembershipRequest {
+// Sent from the impalad BE to FE with the latest membership snapshot of the
+// executors on the cluster resulting from the Membership heartbeat.
+struct TUpdateExecutorMembershipRequest {
+  // The hostnames of the executor nodes.
   1: required set<string> hostnames
+
+  // The ip addresses of the executor nodes.
   2: required set<string> ip_addresses
-  3: i32 num_nodes
+
+  // The number of executors on a cluster, needed since there can be multiple
+  // impalads running on the same host.
+  3: i32 num_executors
 }
 
 // Contains all interesting statistics from a single 'memory pool' in the JVM.

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 881c4ae..5184f96 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -53,7 +53,7 @@ import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
-import org.apache.impala.util.MembershipSnapshot;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -218,11 +218,10 @@ public class HBaseScanNode extends ScanNode {
       LOG.trace("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_));
     }
 
-    // Assume that each node in the cluster gets a scan range, unless there are fewer
+    // Assume that each executor in the cluster gets a scan range, unless there are fewer
     // scan ranges than nodes.
-    numNodes_ = Math.max(1,
-        Math.min(scanRangeSpecs_.getConcrete_rangesSize(),
-            MembershipSnapshot.getCluster().numNodes()));
+    numNodes_ = Math.max(1, Math.min(scanRangeSpecs_.getConcrete_rangesSize(),
+                                ExecutorMembershipSnapshot.getCluster().numExecutors()));
     if (LOG.isTraceEnabled()) {
       LOG.trace("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_));
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index ac6c85a..55bf301 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -83,7 +83,7 @@ import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.BitUtil;
-import org.apache.impala.util.MembershipSnapshot;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1100,7 +1100,7 @@ public class HdfsScanNode extends ScanNode {
    */
   protected void computeNumNodes(Analyzer analyzer, long cardinality) {
     Preconditions.checkNotNull(scanRangeSpecs_);
-    MembershipSnapshot cluster = MembershipSnapshot.getCluster();
+    ExecutorMembershipSnapshot cluster = ExecutorMembershipSnapshot.getCluster();
     HashSet<TNetworkAddress> localHostSet = Sets.newHashSet();
     int totalNodes = 0;
     int numLocalRanges = 0;
@@ -1135,21 +1135,21 @@ public class HdfsScanNode extends ScanNode {
         // hosts that hold block replica for those ranges.
         int numLocalNodes = Math.min(numLocalRanges, localHostSet.size());
         // The remote ranges are round-robined across all the impalads.
-        int numRemoteNodes = Math.min(numRemoteRanges, cluster.numNodes());
+        int numRemoteNodes = Math.min(numRemoteRanges, cluster.numExecutors());
         // The local and remote assignments may overlap, but we don't know by how much so
         // conservatively assume no overlap.
-        totalNodes = Math.min(numLocalNodes + numRemoteNodes, cluster.numNodes());
+        totalNodes = Math.min(numLocalNodes + numRemoteNodes, cluster.numExecutors());
         // Exit early if all hosts have a scan range assignment, to avoid extraneous work
         // in case the number of scan ranges dominates the number of nodes.
-        if (totalNodes == cluster.numNodes()) break;
+        if (totalNodes == cluster.numExecutors()) break;
       }
     }
     // Handle the generated range specifications.
-    if (totalNodes < cluster.numNodes() && scanRangeSpecs_.isSetSplit_specs()) {
+    if (totalNodes < cluster.numExecutors() && scanRangeSpecs_.isSetSplit_specs()) {
       Preconditions.checkState(
           generatedScanRangeCount_ >= scanRangeSpecs_.getSplit_specsSize());
       numRemoteRanges += generatedScanRangeCount_;
-      totalNodes = Math.min(numRemoteRanges, cluster.numNodes());
+      totalNodes = Math.min(numRemoteRanges, cluster.numExecutors());
     }
     // Tables can reside on 0 nodes (empty table), but a plan node must always be
     // executed on at least one node.
@@ -1159,7 +1159,7 @@ public class HdfsScanNode extends ScanNode {
           + (scanRangeSpecs_.getConcrete_rangesSize() + generatedScanRangeCount_)
           + " localRanges=" + numLocalRanges + " remoteRanges=" + numRemoteRanges
           + " localHostSet.size=" + localHostSet.size()
-          + " clusterNodes=" + cluster.numNodes());
+          + " executorNodes=" + cluster.numExecutors());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 9208184..a363458 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -136,9 +136,9 @@ import org.apache.impala.thrift.TStmtType;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
-import org.apache.impala.thrift.TUpdateMembershipRequest;
+import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
 import org.apache.impala.util.EventSequence;
-import org.apache.impala.util.MembershipSnapshot;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.TResultRowBuilder;
 import org.apache.impala.util.TSessionStateUtil;
@@ -252,8 +252,8 @@ public class Frontend {
   /**
    * Update the cluster membership snapshot with the latest snapshot from the backend.
    */
-  public void updateMembership(TUpdateMembershipRequest req) {
-    MembershipSnapshot.update(req);
+  public void updateExecutorMembership(TUpdateExecutorMembershipRequest req) {
+    ExecutorMembershipSnapshot.update(req);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index ad8b165..f02db61 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -95,7 +95,7 @@ import org.apache.impala.thrift.TShowStatsOp;
 import org.apache.impala.thrift.TShowStatsParams;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
-import org.apache.impala.thrift.TUpdateMembershipRequest;
+import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.TSessionStateUtil;
@@ -186,12 +186,13 @@ public class JniFrontend {
 
   /**
    * Jni wrapper for Frontend.updateMembership(). Accepts a serialized
-   * TUpdateMembershipRequest.
+   * TUpdateExecutorMembershipRequest.
    */
-  public void updateMembership(byte[] thriftMembershipUpdate) throws ImpalaException {
-    TUpdateMembershipRequest req = new TUpdateMembershipRequest();
+  public void updateExecutorMembership(byte[] thriftMembershipUpdate)
+      throws ImpalaException {
+    TUpdateExecutorMembershipRequest req = new TUpdateExecutorMembershipRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftMembershipUpdate);
-    frontend_.updateMembership(req);
+    frontend_.updateExecutorMembership(req);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java b/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
new file mode 100644
index 0000000..3482d6b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/ExecutorMembershipSnapshot.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
+import com.google.common.collect.Sets;
+
+/**
+ * Singleton class that represents a snapshot of the Impalad executor membership. Host
+ * membership is determined by both IP address and hostname (to mimic the backend's
+ * Scheduler). A new snapshot is created whenever the cluster membership changes
+ * so that clients don't need to hold a lock while examining a snapshot.
+ */
+public class ExecutorMembershipSnapshot {
+  // The latest instance of the ExecutorMembershipSnapshot.
+  private static AtomicReference<ExecutorMembershipSnapshot> cluster_ =
+      new AtomicReference<ExecutorMembershipSnapshot>(new ExecutorMembershipSnapshot());
+
+  // The set of hosts that are members of the cluster given by hostname.
+  private final Set<String> hostnames_;
+
+  // The set of hosts that are members of the cluster given by IP address.
+  private final Set<String> ipAddresses_;
+
+  // The number of executor nodes of the cluster.  Normally, this will be equal to
+  // hostnames_.size(), except in the test minicluster where there are multiple
+  // impalad's running on a single host.
+  private final int numExecutors_;
+
+  // Used only to construct the initial ExecutorMembershipSnapshot. Before we get the
+  // first snapshot, assume one node (the localhost) to mimic Scheduler.
+  private ExecutorMembershipSnapshot() {
+    hostnames_ = Sets.newHashSet();
+    ipAddresses_ = Sets.newHashSet();
+    numExecutors_ = 1;
+  }
+
+  // Construct a new snapshot based on the TUpdateExecutorMembershipRequest.
+  private ExecutorMembershipSnapshot(TUpdateExecutorMembershipRequest request) {
+    hostnames_ = request.getHostnames();
+    ipAddresses_ = request.getIp_addresses();
+    numExecutors_ = request.getNum_executors();
+  }
+
+  // Determine whether a host, given either by IP address or hostname, is a member of
+  // this snapshot.  Returns true if it is, false otherwise.
+  public boolean contains(TNetworkAddress address) {
+    String host = address.getHostname();
+    return ipAddresses_.contains(host) || hostnames_.contains(host);
+  }
+
+  // The number of nodes in this snapshot.
+  public int numExecutors() { return numExecutors_; }
+
+  // Atomically update the singleton snapshot instance.  After the update completes,
+  // all calls to getCluster() will return the new snapshot.
+  public static void update(TUpdateExecutorMembershipRequest request) {
+    cluster_.set(new ExecutorMembershipSnapshot(request));
+  }
+
+  // Return the current singleton snapshot instance.
+  public static ExecutorMembershipSnapshot getCluster() { return cluster_.get(); }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/main/java/org/apache/impala/util/MembershipSnapshot.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/MembershipSnapshot.java b/fe/src/main/java/org/apache/impala/util/MembershipSnapshot.java
deleted file mode 100644
index be44785..0000000
--- a/fe/src/main/java/org/apache/impala/util/MembershipSnapshot.java
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.util;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.impala.thrift.TNetworkAddress;
-import org.apache.impala.thrift.TUpdateMembershipRequest;
-import com.google.common.collect.Sets;
-
-/**
- * Singleton class that represents a snapshot of the Impalad cluster membership.  Host
- * membership is determined by both IP address and hostname (to mimic the backend's
- * Scheduler).  A new snapshot is created whenever the cluster membership changes
- * so that clients don't need to hold a lock while examining a snapshot.
- */
-public class MembershipSnapshot {
-
-  // The latest instance of the MembershipSnapshot.
-  private static AtomicReference<MembershipSnapshot> cluster_ =
-      new AtomicReference<MembershipSnapshot>(new MembershipSnapshot());
-
-  // The set of hosts that are members of the cluster given by hostname.
-  private final Set<String> hostnames_;
-
-  // The set of hosts that are members of the cluster given by IP address.
-  private final Set<String> ipAddresses_;
-
-  // The number of nodes of the cluster.  Normally, this will be equal to
-  // hostnames_.size(), except in the test minicluster where there are multiple
-  // impalad's running on a single host.
-  private final int numNodes_;
-
-  // Used only to construct the initial MembershipSnapshot.  Before we get the first
-  // snapshot, assume one node (the localhost) to mimic Scheduler.
-  private MembershipSnapshot() {
-    hostnames_ = Sets.newHashSet();
-    ipAddresses_ = Sets.newHashSet();
-    numNodes_ = 1;
-  }
-
-  // Construct a new snapshot based on the TUpdateMembershipRequest.
-  private MembershipSnapshot(TUpdateMembershipRequest request) {
-    hostnames_ = request.getHostnames();
-    ipAddresses_ = request.getIp_addresses();
-    numNodes_ = request.getNum_nodes();
-  }
-
-  // Determine whether a host, given either by IP address or hostname, is a member of
-  // this snapshot.  Returns true if it is, false otherwise.
-  public boolean contains(TNetworkAddress address) {
-    String host = address.getHostname();
-    return ipAddresses_.contains(host) || hostnames_.contains(host);
-  }
-
-  // The number of nodes in this snapshot.
-  public int numNodes() { return numNodes_; }
-
-  // Atomically update the singleton snapshot instance.  After the update completes,
-  // all calls to getCluster() will return the new snapshot.
-  public static void update(TUpdateMembershipRequest request) {
-    cluster_.set(new MembershipSnapshot(request));
-  }
-
-  // Return the current singleton snapshot instance.
-  public static MembershipSnapshot getCluster() { return cluster_.get(); }
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index b671a1e..0c51036 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -70,8 +70,8 @@ import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTupleDescriptor;
-import org.apache.impala.thrift.TUpdateMembershipRequest;
-import org.apache.impala.util.MembershipSnapshot;
+import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduScanToken;
 import org.junit.AfterClass;
@@ -104,11 +104,11 @@ public class PlannerTestBase extends FrontendTestBase {
   @BeforeClass
   public static void setUp() throws Exception {
     // Mimic the 3 node test mini-cluster.
-    TUpdateMembershipRequest updateReq = new TUpdateMembershipRequest();
+    TUpdateExecutorMembershipRequest updateReq = new TUpdateExecutorMembershipRequest();
     updateReq.setIp_addresses(Sets.newHashSet("127.0.0.1"));
     updateReq.setHostnames(Sets.newHashSet("localhost"));
-    updateReq.setNum_nodes(3);
-    MembershipSnapshot.update(updateReq);
+    updateReq.setNum_executors(3);
+    ExecutorMembershipSnapshot.update(updateReq);
 
     if (RuntimeEnv.INSTANCE.isKuduSupported()) {
       kuduClient_ = new KuduClient.KuduClientBuilder("127.0.0.1:7051").build();

http://git-wip-us.apache.org/repos/asf/impala/blob/880011fa/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 4ec3317..b49e328 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -245,3 +245,39 @@ class TestCoordinators(CustomClusterTestSuite):
       if client is not None:
         client.close()
       self._stop_impala_cluster()
+
+  @pytest.mark.execute_serially
+  def test_exclusive_coordinator_plan(self):
+    """Checks that a distributed plan does not assign scan fragments to a coordinator
+    only node. """
+
+    self._start_impala_cluster([], num_coordinators=1, cluster_size=3,
+        use_exclusive_coordinators=True)
+
+    assert len(self.cluster.impalads) == 3
+
+    coordinator = self.cluster.impalads[0]
+    worker1 = self.cluster.impalads[1]
+    worker2 = self.cluster.impalads[2]
+
+    client = None
+    try:
+      client = coordinator.service.create_beeswax_client()
+      assert client is not None
+      self.client = client
+
+      client.execute("SET EXPLAIN_LEVEL=2")
+
+      # Ensure that the plan generated always uses only the executor nodes for scanning
+      # Multi-partition table
+      result = client.execute("explain select count(*) from functional.alltypes "
+              "where id NOT IN (0,1,2) and string_col IN ('aaaa', 'bbbb', 'cccc', NULL) "
+              "and mod(int_col,50) IN (0,1) and id IN (int_col);").data
+      assert 'F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2' in result
+      # Single partition table
+      result = client.execute("explain select * from tpch.lineitem "
+              "union all select * from tpch.lineitem").data
+      assert 'F02:PLAN FRAGMENT [RANDOM] hosts=2 instances=2' in result
+    finally:
+      assert client is not None
+      self._stop_impala_cluster()