You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/03 16:35:52 UTC

[impala] branch master updated (cd30949 -> e32a496)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from cd30949  IMPALA-8502: Bump CDH_BUILD_NUMBER and Kudu version
     new e10d567  IMPALA-5031: signed overflow is undefined behavior
     new d8a0c60  IMPALA-5031: memcpy requires two non-null arguments
     new 24b7a3b  IMPALA-5031: NULL is undefined in memcpy and memcmp
     new d5673bf  IMPALA-8595: Support TLSv1.2 with Python < 2.7.9 in shell
     new 6b3e5fe  IMPALA-8460: Simplify cluster membership management
     new e32a496  IMPALA-8369: Add HIVE_MAJOR_VERSION section to EE tests + some fixes

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/benchmarks/scheduler-benchmark.cc           |   1 -
 be/src/common/logging.h                            |   2 +-
 be/src/common/thread-debug-info-test.cc            |   1 +
 be/src/experiments/hash-ring-util.cc               |   3 +-
 be/src/gutil/atomicops-internals-x86.h             |  14 +-
 be/src/gutil/strings/split.cc                      |   3 +
 be/src/gutil/strings/split.h                       |   8 +-
 be/src/rpc/thrift-util-test.cc                     |   5 +-
 be/src/rpc/thrift-util.cc                          |  28 --
 be/src/runtime/collection-value-builder.h          |   3 +-
 be/src/runtime/exec-env.cc                         |  26 +-
 be/src/runtime/exec-env.h                          |   3 +
 be/src/runtime/krpc-data-stream-mgr.cc             |   1 +
 be/src/runtime/query-state.h                       |   1 +
 be/src/runtime/sorter.cc                           |   3 +-
 be/src/runtime/tuple.cc                            |   5 +-
 be/src/runtime/tuple.h                             |   4 +-
 be/src/scheduling/CMakeLists.txt                   |  11 +-
 be/src/scheduling/backend-config-test.cc           |  93 ----
 be/src/scheduling/backend-config.cc                | 116 -----
 be/src/scheduling/backend-config.h                 |  87 ----
 be/src/scheduling/cluster-membership-mgr-test.cc   | 475 +++++++++++++++++++++
 be/src/scheduling/cluster-membership-mgr.cc        | 401 +++++++++++++++++
 be/src/scheduling/cluster-membership-mgr.h         | 236 ++++++++++
 be/src/scheduling/cluster-membership-test-util.cc  |  60 +++
 .../cluster-membership-test-util.h}                |  26 +-
 be/src/scheduling/executor-group-test.cc           |  82 ++++
 be/src/scheduling/executor-group.cc                | 131 ++++++
 be/src/scheduling/executor-group.h                 | 108 +++++
 be/src/scheduling/query-schedule.cc                |   1 -
 be/src/scheduling/query-schedule.h                 |   4 +-
 be/src/scheduling/scheduler-test-util.cc           |  75 ++--
 be/src/scheduling/scheduler-test-util.h            |  11 +-
 be/src/scheduling/scheduler-test.cc                |  83 ++--
 be/src/scheduling/scheduler.cc                     | 330 +++++---------
 be/src/scheduling/scheduler.h                      | 151 ++-----
 be/src/service/impala-http-handler.cc              |   9 +-
 be/src/service/impala-server.cc                    | 208 +++------
 be/src/service/impala-server.h                     |  67 +--
 be/src/statestore/statestore-subscriber.cc         |  14 +
 be/src/statestore/statestore-subscriber.h          |  23 +-
 be/src/statestore/statestore.cc                    |   1 +
 be/src/testutil/in-process-servers.cc              |   1 -
 be/src/util/container-util.h                       |  67 ++-
 be/src/util/dict-encoding.h                        |   3 +-
 be/src/util/network-util.cc                        |  11 -
 be/src/util/network-util.h                         |   4 -
 be/src/util/runtime-profile-test.cc                |   1 +
 be/src/util/uid-util-test.cc                       |   1 +
 common/thrift/CMakeLists.txt                       |   4 +-
 common/thrift/Frontend.thrift                      |   2 +-
 common/thrift/StatestoreService.thrift             |   2 +-
 common/thrift/metrics.json                         |  11 +
 .../java/org/apache/impala/service/Frontend.java   |   3 +-
 shell/TSSLSocketWithWildcardSAN.py                 |   8 +-
 .../queries/QueryTest/alter-table.test             |  16 +
 tests/common/environ.py                            |   8 +
 tests/common/impala_test_suite.py                  |   8 +
 tests/custom_cluster/test_client_ssl.py            |   8 +-
 tests/custom_cluster/test_coordinators.py          |  10 +-
 tests/custom_cluster/test_restart_services.py      |   4 +-
 tests/query_test/test_scanners.py                  |   4 +
 tests/util/test_file_parser.py                     |   2 +-
 63 files changed, 2038 insertions(+), 1054 deletions(-)
 delete mode 100644 be/src/scheduling/backend-config-test.cc
 delete mode 100644 be/src/scheduling/backend-config.cc
 delete mode 100644 be/src/scheduling/backend-config.h
 create mode 100644 be/src/scheduling/cluster-membership-mgr-test.cc
 create mode 100644 be/src/scheduling/cluster-membership-mgr.cc
 create mode 100644 be/src/scheduling/cluster-membership-mgr.h
 create mode 100644 be/src/scheduling/cluster-membership-test-util.cc
 copy be/src/{exprs/is-null-predicate.h => scheduling/cluster-membership-test-util.h} (53%)
 create mode 100644 be/src/scheduling/executor-group-test.cc
 create mode 100644 be/src/scheduling/executor-group.cc
 create mode 100644 be/src/scheduling/executor-group.h


[impala] 01/06: IMPALA-5031: signed overflow is undefined behavior

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e10d56762d26a6364e915d8693f936944f43d3ab
Author: Jim Apple <jb...@apache.org>
AuthorDate: Fri May 24 17:50:56 2019 -0700

    IMPALA-5031: signed overflow is undefined behavior
    
    This undefined behavior was caught with UBSAN in the end-to-end
    tests. The interesting part of the backtrace is:
    
        gutil/atomicops-internals-x86.h:283:15: runtime error: signed
           integer overflow: -9223370395229620599 + -9223371946660462582
           cannot be represented in type 'long'
        #0 base::subtle::Barrier_AtomicIncrement(long volatile*, long)
           gutil/atomicops-internals-x86.h:283:15
        #1 internal::AtomicInt<long>::Add(long) common/atomic.h:93:12
        #2 RuntimeProfile::Counter::Add(long) util/runtime-profile.h:93
        #3 HdfsOrcScanner::AssembleRows(RowBatch*)
           exec/hdfs-orc-scanner.cc:636:50
        #4 HdfsOrcScanner::GetNextInternal(RowBatch*)
           exec/hdfs-orc-scanner.cc:507:19
        #5 HdfsOrcScanner::ProcessSplit() exec/hdfs-orc-scanner.cc:426:21
        #6 HdfsScanNode::ProcessSplit(vector<FilterContext> const&,
           MemPool*, io::ScanRange*, long*) exec/hdfs-scan-node.cc:514:21
        #7 HdfsScanNode::ScannerThread(bool, long)
           exec/hdfs-scan-node.cc:415:7
        #8 HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool*)::
           $_0::operator()() const exec/hdfs-scan-node.cc:337:13
    
    Change-Id: Ic638ff4959eaaffc79caa3453dbccaaabcbe95c9
    Reviewed-on: http://gerrit.cloudera.org:8080/13433
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/gutil/atomicops-internals-x86.h | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/be/src/gutil/atomicops-internals-x86.h b/be/src/gutil/atomicops-internals-x86.h
index 90ae25a..4d26bff 100644
--- a/be/src/gutil/atomicops-internals-x86.h
+++ b/be/src/gutil/atomicops-internals-x86.h
@@ -31,6 +31,8 @@
 
 #include <common/logging.h>
 
+#include "util/arithmetic-util.h"
+
 #define BASE_HAS_ATOMIC64 1  // Use only in tests and base/atomic*
 
 
@@ -116,7 +118,7 @@ inline Atomic32 NoBarrier_AtomicIncrement(volatile Atomic32* ptr,
                        : "+r" (temp), "+m" (*ptr)
                        : : "memory");
   // temp now holds the old value of *ptr
-  return temp + increment;
+  return impala::ArithmeticUtil::AsUnsigned<std::plus>(temp, increment);
 }
 
 inline Atomic32 Barrier_AtomicIncrement(volatile Atomic32* ptr,
@@ -127,7 +129,7 @@ inline Atomic32 Barrier_AtomicIncrement(volatile Atomic32* ptr,
                        : "+r" (temp), "+m" (*ptr)
                        : : "memory");
   // temp now holds the old value of *ptr
-  return temp + increment;
+  return impala::ArithmeticUtil::AsUnsigned<std::plus>(temp, increment);
 }
 
 // On x86, the NoBarrier_CompareAndSwap() uses a locked instruction and so also
@@ -269,7 +271,7 @@ inline Atomic64 NoBarrier_AtomicIncrement(volatile Atomic64* ptr,
                        : "+r" (temp), "+m" (*ptr)
                        : : "memory");
   // temp now contains the previous value of *ptr
-  return temp + increment;
+  return impala::ArithmeticUtil::AsUnsigned<std::plus>(temp, increment);
 }
 
 inline Atomic64 Barrier_AtomicIncrement(volatile Atomic64* ptr,
@@ -280,7 +282,7 @@ inline Atomic64 Barrier_AtomicIncrement(volatile Atomic64* ptr,
                        : "+r" (temp), "+m" (*ptr)
                        : : "memory");
   // temp now contains the previous value of *ptr
-  return temp + increment;
+  return impala::ArithmeticUtil::AsUnsigned<std::plus>(temp, increment);
 }
 
 inline void NoBarrier_Store(volatile Atomic64* ptr, Atomic64 value) {
@@ -408,10 +410,10 @@ inline Atomic64 NoBarrier_AtomicIncrement(volatile Atomic64* ptr,
 
   do {
     old_val = *ptr;
-    new_val = old_val + increment;
+    new_val = return impala::ArithmeticUtil::AsUnsigned<std::plus>(old_val, increment);
   } while (__sync_val_compare_and_swap(ptr, old_val, new_val) != old_val);
 
-  return old_val + increment;
+  return impala::ArithmeticUtil::AsUnsigned<std::plus>(old_val, increment);
 }
 
 inline Atomic64 Barrier_AtomicIncrement(volatile Atomic64* ptr,


[impala] 03/06: IMPALA-5031: NULL is undefined in memcpy and memcmp

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 24b7a3bf56400ac4007de1bb16761bafac98b5a6
Author: Jim Apple <jb...@apache.org>
AuthorDate: Sat May 25 11:45:21 2019 -0700

    IMPALA-5031: NULL is undefined in memcpy and memcmp
    
    This patch fixes UBSAN "null pointer passed as argument" errors in the
    end-to-end tests. These are undefined behavior according to "7.1.4 Use
    of library functions" in the C99 standard (which is included in C++14
    in section [intro.refs]):
    
        If an argument to a function has an invalid value (such as a value
        outside the domain of the function, or a pointer outside the
        address space of the program, or a null pointer, or a pointer to
        non-modifiable storage when the corresponding parameter is not
        const-qualified) or a type (after promotion) not expected by a
        function with variable number of arguments, the behavior is
        undefined.
    
    The interesting parts of the backtraces are:
    
        runtime/sorter.cc:575:18: runtime error: null pointer passed as
           argument 2, which is declared to never be null
        /usr/include/string.h:43:45: note: nonnull attribute specified
           here
        #0 Sorter::Run::CopyVarLenData(vector<StringValue*> const&,
           unsigned char*) runtime/sorter.cc:575:5
        #1 Status Sorter::Run::AddBatchInternal<true, true>(RowBatch*,
           int, int*) runtime/sorter.cc:232:11
        #2 Sorter::Run::AddInputBatch(RowBatch*, int, int*)
           runtime/sorter.cc:660:12
        #3 Sorter::AddBatchNoSpill(RowBatch*, int, int*)
           runtime/sorter.cc:882:58
        #4 Sorter::AddBatch(RowBatch*) runtime/sorter.cc:862:45
        #5 SortNode::SortInput(RuntimeState*) exec/sort-node.cc:177:54
        #6 SortNode::Open(RuntimeState*) exec/sort-node.cc:90:43
    
        runtime/tuple.cc:105:25: runtime error: null pointer passed as
           argument 2, which is declared to never be null
        /usr/include/string.h:43:45: note: nonnull attribute specified
           here
        #0 Tuple::DeepCopyVarlenData(TupleDescriptor const&, MemPool*)
           runtime/tuple.cc:105:5
        #1 Tuple::DeepCopy(Tuple*, TupleDescriptor const&, MemPool*)
           runtime/tuple.cc:94:35
        #2 Tuple::DeepCopy(TupleDescriptor const&, MemPool*)
           runtime/tuple.cc:85:3
        #3 KrpcDataStreamSender::Channel::AddRow(TupleRow*)
           runtime/krpc-data-stream-sender.cc:509:43
        #4 KrpcDataStreamSender::AddRowToChannel(int, TupleRow*)
           runtime/krpc-data-stream-sender.cc:846
        #5 (<unknown module>)
    
        runtime/tuple.cc:146:19: runtime error: null pointer passed as
           argument 2, which is declared to never be null
        /usr/include/string.h:43:45: note: nonnull attribute specified
           here
        #0 Tuple::DeepCopyVarlenData(TupleDescriptor const&, char**, int*,
           bool) runtime/tuple.cc:146:5
        #1 Tuple::DeepCopy(TupleDescriptor const&, char**, int*, bool)
           runtime/tuple.cc:135:35
        #2 RowBatch::SerializeInternal(long, FixedSizeHashTable<Tuple*,
           int>*, vector<int>*, string*) runtime/row-batch.cc:392:14
        #3 RowBatch::Serialize(bool, vector<int>*, string*, long*, bool*)
           runtime/row-batch.cc:290:45
        #4 RowBatch::Serialize(OutboundRowBatch*)
           runtime/row-batch.cc:259:43
        #5 KrpcDataStreamSender::SerializeBatch(RowBatch*,
           OutboundRowBatch*, int) runtime/krpc-data-stream-sender.cc:955:50
        #6 KrpcDataStreamSender::Send(RuntimeState*, RowBatch*)
           runtime/krpc-data-stream-sender.cc:870:45
    
        runtime/tuple.h:106:12: runtime error: null pointer passed as
           argument 1, which is declared to never be null
        /usr/include/string.h:62:79: note: nonnull attribute specified
           here
        #0 Tuple::ClearNullBits(int, int) runtime/tuple.h:106:5
        #1 HdfsScanner::InitTuple(TupleDescriptor const*, Tuple*, Tuple*)
           exec/hdfs-scanner.h:512:14
        #2 HdfsOrcScanner::AssembleCollection(OrcComplexColumnReader
           const&, int, CollectionValueBuilder*)
           exec/hdfs-orc-scanner.cc:742:7
        #3 OrcCollectionReader::ReadValue(int, Tuple*, MemPool*)
           exec/orc-column-readers.cc:375:20
        #4 OrcStructReader::ReadValue(int, Tuple*, MemPool*)
           exec/orc-column-readers.cc:322:52
        #5 OrcListReader::ReadChildrenValue(int, int, Tuple*, MemPool*)
           const exec/orc-column-readers.cc:473:52
        #6 HdfsOrcScanner::AssembleCollection(OrcComplexColumnReader
           const&, int, CollectionValueBuilder*)
           exec/hdfs-orc-scanner.cc:743:60
        #7 OrcCollectionReader::ReadValue(int, Tuple*, MemPool*)
           exec/orc-column-readers.cc:375:20
        #8 OrcStructReader::TransferTuple(Tuple*, MemPool*)
           exec/orc-column-readers.cc:346:52
        #9 HdfsOrcScanner::TransferTuples(OrcComplexColumnReader*,
           RowBatch*) exec/hdfs-orc-scanner.cc:669:58
        #10 HdfsOrcScanner::AssembleRows(RowBatch*)
            exec/hdfs-orc-scanner.cc:629:45
        #11 HdfsOrcScanner::GetNextInternal(RowBatch*)
            exec/hdfs-orc-scanner.cc:507:19
        #12 HdfsOrcScanner::ProcessSplit() exec/hdfs-orc-scanner.cc:426:21
        #13 HdfsScanNode::ProcessSplit(vector<FilterContext> const&,
            MemPool*, io::ScanRange*, long*) exec/hdfs-scan-node.cc:514:21
        #14 HdfsScanNode::ScannerThread(bool, long)
            exec/hdfs-scan-node.cc:415:7
        #15 HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool*)
            ::$_0::operator()() const exec/hdfs-scan-node.cc:337:13
    
        runtime/collection-value-builder.h:75:25: runtime error: null
           pointer passed as argument 2, which is declared to never be null
        /usr/include/string.h:43:28: note: nonnull attribute specified
           here
        #0 CollectionValueBuilder::GetFreeMemory(Tuple**, int*)
           runtime/collection-value-builder.h:75:9
        #1 HdfsScanner::GetCollectionMemory(CollectionValueBuilder*,
           MemPool**, Tuple**, TupleRow**, long*)
           exec/hdfs-scanner.cc:194:3
        #2 HdfsOrcScanner::AssembleCollection(OrcComplexColumnReader
           const&, int, CollectionValueBuilder*)
           exec/hdfs-orc-scanner.cc:733:9
        #3 HdfsOrcScanner::AssembleCollection(OrcComplexColumnReader
           const&, int, CollectionValueBuilder*)
           exec/hdfs-orc-scanner.cc:710:7
        #4 HdfsOrcScanner::AssembleCollection(OrcComplexColumnReader
           const&, int, CollectionValueBuilder*)
           exec/hdfs-orc-scanner.cc:710:7
        #5 OrcCollectionReader::ReadValue(int, Tuple*, MemPool*)
           exec/orc-column-readers.cc:375:20
        #6 OrcStructReader::TransferTuple(Tuple*, MemPool*)
           exec/orc-column-readers.cc:346:5
        #7 HdfsOrcScanner::TransferTuples(OrcComplexColumnReader*,
           RowBatch*) exec/hdfs-orc-scanner.cc:669:5
        #8 HdfsOrcScanner::AssembleRows(RowBatch*)
           exec/hdfs-orc-scanner.cc:629:5
        #9 HdfsOrcScanner::GetNextInternal(RowBatch*)
           exec/hdfs-orc-scanner.cc:507:19
        #10 HdfsScanner::GetNext(RowBatch*) exec/hdfs-scanner.h:133:12
        #11 HdfsScanNodeMt::GetNext(RuntimeState*, RowBatch*, bool*)
            exec/hdfs-scan-node-mt.cc:106:29
        #12 SubplanNode::GetNext(RuntimeState*, RowBatch*, bool*)
            exec/subplan-node.cc:129:7
        #13 AggregationNode::Open(RuntimeState*)
            exec/aggregation-node.cc:67:5
    
    Change-Id: I9362ce6b9ba470ed90e5bd2dc313b66ebd8c6af5
    Reviewed-on: http://gerrit.cloudera.org:8080/13436
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/collection-value-builder.h | 3 ++-
 be/src/runtime/sorter.cc                  | 3 ++-
 be/src/runtime/tuple.cc                   | 5 +++--
 be/src/runtime/tuple.h                    | 4 +++-
 4 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index d75e94b..c98816b 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -22,6 +22,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/tuple.h"
 #include "util/debug-util.h"
+#include "util/ubsan.h"
 
 namespace impala {
 
@@ -72,7 +73,7 @@ class CollectionValueBuilder {
               ErrorMsg(TErrorCode::COLLECTION_ALLOC_FAILED, new_buffer_size,
               path, buffer_size_, coll_value_->num_tuples).msg(), new_buffer_size);
         }
-        memcpy(new_buf, coll_value_->ptr, bytes_written);
+        Ubsan::MemCpy(new_buf, coll_value_->ptr, bytes_written);
         coll_value_->ptr = new_buf;
         buffer_size_ = new_buffer_size;
       }
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index a7a5a64..e3d828c 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -27,6 +27,7 @@
 #include "runtime/query-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
+#include "util/ubsan.h"
 
 #include "common/names.h"
 
@@ -572,7 +573,7 @@ Status Sorter::Run::AddPage(vector<Page>* page_sequence) {
 void Sorter::Run::CopyVarLenData(const vector<StringValue*>& string_values,
     uint8_t* dest) {
   for (StringValue* string_val: string_values) {
-    memcpy(dest, string_val->ptr, string_val->len);
+    Ubsan::MemCpy(dest, string_val->ptr, string_val->len);
     string_val->ptr = reinterpret_cast<char*>(dest);
     dest += string_val->len;
   }
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index 916ae6f..e5688d3 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -35,6 +35,7 @@
 #include "runtime/tuple-row.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
+#include "util/ubsan.h"
 
 #include "common/names.h"
 
@@ -102,7 +103,7 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, MemPool* pool) {
     if (IsNull((*slot)->null_indicator_offset())) continue;
     StringValue* string_v = GetStringSlot((*slot)->tuple_offset());
     char* string_copy = reinterpret_cast<char*>(pool->Allocate(string_v->len));
-    memcpy(string_copy, string_v->ptr, string_v->len);
+    Ubsan::MemCpy(string_copy, string_v->ptr, string_v->len);
     string_v->ptr = string_copy;
   }
 
@@ -143,7 +144,7 @@ void Tuple::DeepCopyVarlenData(const TupleDescriptor& desc, char** data, int* of
     if (IsNull((*slot)->null_indicator_offset())) continue;
 
     StringValue* string_v = GetStringSlot((*slot)->tuple_offset());
-    memcpy(*data, string_v->ptr, string_v->len);
+    Ubsan::MemCpy(*data, string_v->ptr, string_v->len);
     string_v->ptr = convert_ptrs ? reinterpret_cast<char*>(*offset) : *data;
     *data += string_v->len;
     *offset += string_v->len;
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 6a40af8..5f2c4af 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -24,6 +24,7 @@
 #include "gutil/macros.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-pool.h"
+#include "util/ubsan.h"
 
 namespace llvm {
 class Function;
@@ -103,7 +104,8 @@ class Tuple {
   }
 
   void ClearNullBits(int null_bytes_offset, int num_null_bytes) {
-    memset(reinterpret_cast<uint8_t*>(this) + null_bytes_offset, 0, num_null_bytes);
+    Ubsan::MemSet(
+        reinterpret_cast<uint8_t*>(this) + null_bytes_offset, 0, num_null_bytes);
   }
 
   /// The total size of all data represented in this tuple (tuple data and referenced


[impala] 06/06: IMPALA-8369: Add HIVE_MAJOR_VERSION section to EE tests + some fixes

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e32a4967c8094c334f4f6415019b52ef96a76e6d
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Thu May 30 16:24:15 2019 +0200

    IMPALA-8369: Add HIVE_MAJOR_VERSION section to EE tests + some fixes
    
    Fixed tests with Hive3:
    test_scanners.py - test_scan_truncated_file_empty (exhaustive):
     Added REFRESH after Hive INSERT OVERWRITE. The test worked in Hive2
     only because there was an empty file with the same name as before
     the overwrite.
    test_ddl.py - test_alter_table:
     A Hive3 regression broke some tests + caused the dropping of
     the test database to hang. These tests are skipped for now,
     the Hive side fix is tracked in HIVE-21806.
    
    Change-Id: I4c3cff05ed7080b655b6af64ea09c0691e7dd931
    Reviewed-on: http://gerrit.cloudera.org:8080/13472
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../functional-query/queries/QueryTest/alter-table.test  | 16 ++++++++++++++++
 tests/common/impala_test_suite.py                        |  8 ++++++++
 tests/query_test/test_scanners.py                        |  4 ++++
 tests/util/test_file_parser.py                           |  2 +-
 4 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/testdata/workloads/functional-query/queries/QueryTest/alter-table.test b/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
index 5d9e6cf..ac65d29 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/alter-table.test
@@ -900,10 +900,14 @@ show tables in $DATABASE2 like '*mv*'
 ---- TYPES
 STRING
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 # Tests that renaming a partitioned table with column stats across databases
 # succeeds and preserves table and column stats, and allows the renamed table
 # to be dropped (IMPALA-2810).
+#
+# Skip in Hive 3, see HIVE-21806 for details.
 create table $DATABASE.mv (x int) partitioned by (y string);
 insert into $DATABASE.mv partition(y='a') values(1);
 insert into $DATABASE.mv partition(y='b') values(2);
@@ -913,12 +917,16 @@ alter table $DATABASE.mv rename to $DATABASE2.mv2;
 invalidate metadata $DATABASE2.mv2
 ---- RESULTS
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 show tables in $DATABASE like '*mv*'
 ---- RESULTS
 ---- TYPES
 STRING
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 show tables in $DATABASE2 like '*mv*'
 ---- RESULTS
@@ -926,6 +934,8 @@ show tables in $DATABASE2 like '*mv*'
 ---- TYPES
 STRING
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 show table stats $DATABASE2.mv2
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
@@ -936,6 +946,8 @@ show table stats $DATABASE2.mv2
 ---- TYPES
 STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 show column stats $DATABASE2.mv2
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
@@ -944,11 +956,15 @@ show column stats $DATABASE2.mv2
 ---- TYPES
 STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 drop table $DATABASE2.mv2
 ---- RESULTS
 'Table has been dropped.'
 ====
+---- HIVE_MAJOR_VERSION
+2
 ---- QUERY
 show tables in $DATABASE2 like '*mv*'
 ---- RESULTS
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 1be55f7..44e9124 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -36,6 +36,7 @@ from getpass import getuser
 from random import choice
 from subprocess import check_call
 from tests.common.base_test_suite import BaseTestSuite
+from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.errors import Timeout
 from tests.common.impala_connection import create_connection
 from tests.common.impala_service import ImpaladService
@@ -515,6 +516,13 @@ class ImpalaTestSuite(BaseTestSuite):
     sections = self.load_query_test_file(self.get_workload(), test_file_name,
         encoding=encoding)
     for test_section in sections:
+      if 'HIVE_MAJOR_VERSION' in test_section:
+        needed_hive_major_version = int(test_section['HIVE_MAJOR_VERSION'])
+        assert needed_hive_major_version in [2, 3]
+        assert HIVE_MAJOR_VERSION in [2, 3]
+        if needed_hive_major_version != HIVE_MAJOR_VERSION:
+          continue
+
       if 'SHELL' in test_section:
         assert len(test_section) == 1, \
             "SHELL test sections can't contain other sections"
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 5d6e8c1..15686de 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1145,6 +1145,10 @@ class TestScanTruncatedFiles(ImpalaTestSuite):
     self.run_stmt_in_hive("insert overwrite table %s select string_col from "
         "functional.alltypes limit %s" % (fq_tbl_name, num_rows))
 
+    # The file will not exist if the table is empty and the insert is done by Hive 3, so
+    # another refresh is needed.
+    self.execute_query("refresh %s" % fq_tbl_name)
+
     result = self.execute_query("select count(*) from %s" % fq_tbl_name)
     assert(len(result.data) == 1)
     assert(result.data[0] == str(num_rows))
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index 789fb9a..9d90a99 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -96,7 +96,7 @@ def parse_query_test_file(file_name, valid_section_names=None, encoding=None):
   if section_names is None:
     section_names = ['QUERY', 'HIVE_QUERY', 'RESULTS', 'TYPES', 'LABELS', 'SETUP',
         'CATCH', 'ERRORS', 'USER', 'RUNTIME_PROFILE', 'SHELL', 'DML_RESULTS',
-        'DBAPI_RESULTS', 'HS2_TYPES']
+        'DBAPI_RESULTS', 'HS2_TYPES', 'HIVE_MAJOR_VERSION']
   return parse_test_file(file_name, section_names, encoding=encoding,
       skip_unknown_sections=False)
 


[impala] 02/06: IMPALA-5031: memcpy requires two non-null arguments

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d8a0c60b88329bd89c9d80265d1292168bf7603e
Author: Jim Apple <jb...@apache.org>
AuthorDate: Fri May 24 18:17:46 2019 -0700

    IMPALA-5031: memcpy requires two non-null arguments
    
    Counterintuitively, even passing 0 as the third argument of memcpy
    does not avoid undefined behavior. This occurred during an end-to-end
    test. The interesting part of the backtrace is:
    
        util/dict-encoding.h:451:20: runtime error: null pointer passed
           as argument 2, which is declared to never be null
        /usr/include/string.h:43:45: note: nonnull attribute specified
           here
        #0 DictEncoder<StringValue>::AddToTable(StringValue const&,
           unsigned short*) util/dict-encoding.h:451:3
        #1 DictEncoder<StringValue>::Put(StringValue const&)
           util/dict-encoding.h:422:10
        #2 HdfsParquetTableWriter::ColumnWriter<StringValue>::
           ProcessValue(void*, long*)
           exec/parquet/hdfs-parquet-table-writer.cc:436:38
        #3 HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow*)
           exec/parquet/hdfs-parquet-table-writer.cc:662:9
        #4 HdfsParquetTableWriter::AppendRows(RowBatch*,
           vector<int> const&, bool*)
           exec/parquet/hdfs-parquet-table-writer.cc:1192:60
        #5 HdfsTableSink::WriteRowsToPartition(RuntimeState*, RowBatch*,
           pair<unique_ptr<OutputPartition>, vector<int>>*)
           exec/hdfs-table-sink.cc:253:71
        #6 HdfsTableSink::Send(RuntimeState*, RowBatch*)
           exec/hdfs-table-sink.cc:588:45
    
    Change-Id: I2e8e57c34c2848f0dc7dbf32892cc6e86df63506
    Reviewed-on: http://gerrit.cloudera.org:8080/13434
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/dict-encoding.h | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index fa52275..c7174f6 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -30,6 +30,7 @@
 #include "util/bit-util.h"
 #include "util/mem-util.h"
 #include "util/rle-encoding.h"
+#include "util/ubsan.h"
 
 namespace impala {
 
@@ -448,7 +449,7 @@ template<>
 inline int DictEncoder<StringValue>::AddToTable(const StringValue& value,
     NodeIndex* bucket) {
   char* ptr_copy = reinterpret_cast<char*>(pool_->Allocate(value.len));
-  memcpy(ptr_copy, value.ptr, value.len);
+  Ubsan::MemCpy(ptr_copy, value.ptr, value.len);
   StringValue sv(ptr_copy, value.len);
   Node node(sv, *bucket);
   ConsumeBytes(sizeof(node));


[impala] 04/06: IMPALA-8595: Support TLSv1.2 with Python < 2.7.9 in shell

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d5673bf241ed60ffe7aa0bcdf952bca1e9cc7631
Author: Robbie Zhang <rz...@cloudera.com>
AuthorDate: Wed May 29 06:22:26 2019 -0700

    IMPALA-8595: Support TLSv1.2 with Python < 2.7.9 in shell
    
    IMPALA-5690 replaced thrift 0.9.0 with 0.9.3 in which THRIFT-3505
    changed transport/TSSLSocket.py.
    In thrift 0.9.3, if the python version is lower than 2.7.9, TSSLSocket
    uses PROTOCOL_TLSv1 by default and the SSL version is passed to
    TSSLSocket as a paramter when calling TSSLSocket.__init__.
    Although TLSv1.2 is supported by Python from 2.7.9, Red Hat/CentOS
    support TLSv1.2 from 2.7.5 with upgraded python-libs. We need to get
    impala-shell support TLSv1.2 with Python 2.7.5 on Red Hat/CentOS.
    
    TESTING:
    impala-py.test tests/custom_cluster/test_client_ssl.py
    
    Change-Id: I3fb6510f4b556bd8c6b1e86380379aba8be4b805
    Reviewed-on: http://gerrit.cloudera.org:8080/13457
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 shell/TSSLSocketWithWildcardSAN.py      | 8 +++++---
 tests/common/environ.py                 | 8 ++++++++
 tests/custom_cluster/test_client_ssl.py | 8 +++++++-
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/shell/TSSLSocketWithWildcardSAN.py b/shell/TSSLSocketWithWildcardSAN.py
index d021aba..88fc119 100755
--- a/shell/TSSLSocketWithWildcardSAN.py
+++ b/shell/TSSLSocketWithWildcardSAN.py
@@ -43,14 +43,16 @@ class TSSLSocketWithWildcardSAN(TSSLSocket.TSSLSocket):
       ca_certs=None,
       unix_socket=None):
     cert_reqs = ssl.CERT_REQUIRED if validate else ssl.CERT_NONE
-    TSSLSocket.TSSLSocket.__init__(self, host=host, port=port, cert_reqs=cert_reqs,
-                                   ca_certs=ca_certs, unix_socket=unix_socket)
     # Set client protocol choice to be very permissive, as we rely on servers to enforce
     # good protocol selection. This value is forwarded to the ssl.wrap_socket() API during
     # open(). See https://docs.python.org/2/library/ssl.html#socket-creation for a table
     # that shows a better option is not readily available for sockets that use
     # wrap_socket().
-    self.SSL_VERSION = ssl.PROTOCOL_SSLv23
+    # THRIFT-3505 changes transport/TSSLSocket.py. The SSL_VERSION is passed to TSSLSocket
+    # via a parameter.
+    TSSLSocket.TSSLSocket.__init__(self, host=host, port=port, cert_reqs=cert_reqs,
+                                   ca_certs=ca_certs, unix_socket=unix_socket,
+                                   ssl_version=ssl.PROTOCOL_SSLv23)
 
   def _validate_cert(self):
     cert = self.handle.getpeercert()
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 30805e7..5f4e18a 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -20,6 +20,7 @@ import logging
 import os
 import re
 import requests
+import platform
 
 LOG = logging.getLogger('tests.common.environ')
 test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS", "")
@@ -41,6 +42,13 @@ if os.path.isfile(IMPALA_LOCAL_VERSION_INFO):
   if IMPALA_LOCAL_BUILD_VERSION is None:
     raise Exception("Could not find VERSION in {0}".format(IMPALA_LOCAL_VERSION_INFO))
 
+# Check if it is Red Hat/CentOS Linux
+dist = platform.linux_distribution()[0].lower()
+if dist.find('centos') or dist.find('red hat'):
+  IS_REDHAT_DERIVATIVE = True
+else:
+  IS_REDHAT_DERIVATIVE = False
+
 # Find the likely BuildType of the running Impala. Assume it's found through the path
 # $IMPALA_HOME/be/build/latest as a fallback.
 build_type_arg_regex = re.compile(r'--build_type=(\w+)', re.I)
diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py
index 6f8f91f..885e80c 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -26,6 +26,7 @@ import socket
 import sys
 import time
 
+from tests.common.environ import IS_REDHAT_DERIVATIVE
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
 from tests.common.test_dimensions import create_beeswax_dimension
@@ -33,7 +34,12 @@ from tests.shell.util import run_impala_shell_cmd, run_impala_shell_cmd_no_expec
     ImpalaShell
 
 REQUIRED_MIN_OPENSSL_VERSION = 0x10001000L
-REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2,7,9)
+# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5
+# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already
+if IS_REDHAT_DERIVATIVE:
+  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5)
+else:
+  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9)
 _openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None)
 if _openssl_version_number is None:
   SKIP_SSL_MSG = "Legacy OpenSSL module detected"


[impala] 05/06: IMPALA-8460: Simplify cluster membership management

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6b3e5fe426a7cd8b13c18a54fe6c2726ab8667d8
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Sun Apr 14 20:35:53 2019 -0700

    IMPALA-8460: Simplify cluster membership management
    
    This change adds a class to track cluster membership called
    ClusterMembershipMgr. It replaces the logic that was partially
    duplicated between the ImpalaServer and the Coordinator and makes sure
    that the local backend descriptor is consistent (IMPALA-8469).
    
    The ClusterMembershipMgr maintains a view of the cluster membership and
    incorporates incoming updates from the statestore. It also registers the
    local backend with the statestore after startup. Clients can obtain a
    consistent, immutable snapshot of the current cluster membership from
    the ClusterMembershipMgr. Additionally, callbacks can be registered to
    receive notifications of cluster membership changes. The ImpalaServer
    and Frontend use this mechanism.
    
    This change also generalizes the fix for IMPALA-7665: updates from the
    statestore to the cluster membership topic are only made visible to the
    rest of the local server after a post-recovery grace period has elapsed.
    As part of this the flag
    'failed_backends_query_cancellation_grace_period_ms' is replaced with
    'statestore_subscriber_recovery_grace_period_ms'. To tell the initial
    startup from post-recovery, a new metric
    'statestore-subscriber.num-connection-failures' is exposed by the
    daemon, which tracks the total number of connection failures to the
    statestore over the lifetime process lifetime.
    
    This change also unifies the naming of executor-related classes, in
    particular it renames "BackendConfig" to "ExecutorGroup". In
    anticipation of a subsequent change (IMPALA-8484), it adds maps to store
    multiple executor groups.
    
    This change also disables the generation of default operators from the
    thrift files and instead adds explicit implementations for the ones that
    we rely on. This forces us to explicitly specify comparators when
    manipulating containers of thrift structs and will help prevent
    accidental bugs.
    
    Testing: This change adds a backend unit test for the new cluster
    membership manager. The observable behavior of Impala does not change,
    and the existing scheduler unit test and end to end tests should make
    sure of that.
    
    Change-Id: Ib3cf9a8bb060d0c6e9ec8868b7b21ce01f8740a3
    Reviewed-on: http://gerrit.cloudera.org:8080/13207
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/benchmarks/scheduler-benchmark.cc           |   1 -
 be/src/common/logging.h                            |   2 +-
 be/src/common/thread-debug-info-test.cc            |   1 +
 be/src/experiments/hash-ring-util.cc               |   3 +-
 be/src/gutil/strings/split.cc                      |   3 +
 be/src/gutil/strings/split.h                       |   8 +-
 be/src/rpc/thrift-util-test.cc                     |   5 +-
 be/src/rpc/thrift-util.cc                          |  28 --
 be/src/runtime/exec-env.cc                         |  26 +-
 be/src/runtime/exec-env.h                          |   3 +
 be/src/runtime/krpc-data-stream-mgr.cc             |   1 +
 be/src/runtime/query-state.h                       |   1 +
 be/src/scheduling/CMakeLists.txt                   |  11 +-
 be/src/scheduling/backend-config-test.cc           |  93 ----
 be/src/scheduling/backend-config.cc                | 116 -----
 be/src/scheduling/backend-config.h                 |  87 ----
 be/src/scheduling/cluster-membership-mgr-test.cc   | 475 +++++++++++++++++++++
 be/src/scheduling/cluster-membership-mgr.cc        | 401 +++++++++++++++++
 be/src/scheduling/cluster-membership-mgr.h         | 236 ++++++++++
 be/src/scheduling/cluster-membership-test-util.cc  |  60 +++
 .../cluster-membership-test-util.h}                |  31 +-
 be/src/scheduling/executor-group-test.cc           |  82 ++++
 be/src/scheduling/executor-group.cc                | 131 ++++++
 be/src/scheduling/executor-group.h                 | 108 +++++
 be/src/scheduling/query-schedule.cc                |   1 -
 be/src/scheduling/query-schedule.h                 |   4 +-
 be/src/scheduling/scheduler-test-util.cc           |  75 ++--
 be/src/scheduling/scheduler-test-util.h            |  11 +-
 be/src/scheduling/scheduler-test.cc                |  83 ++--
 be/src/scheduling/scheduler.cc                     | 330 +++++---------
 be/src/scheduling/scheduler.h                      | 151 ++-----
 be/src/service/impala-http-handler.cc              |   9 +-
 be/src/service/impala-server.cc                    | 208 +++------
 be/src/service/impala-server.h                     |  67 +--
 be/src/statestore/statestore-subscriber.cc         |  14 +
 be/src/statestore/statestore-subscriber.h          |  23 +-
 be/src/statestore/statestore.cc                    |   1 +
 be/src/testutil/in-process-servers.cc              |   1 -
 be/src/util/container-util.h                       |  67 ++-
 be/src/util/network-util.cc                        |  11 -
 be/src/util/network-util.h                         |   4 -
 be/src/util/runtime-profile-test.cc                |   1 +
 be/src/util/uid-util-test.cc                       |   1 +
 common/thrift/CMakeLists.txt                       |   4 +-
 common/thrift/Frontend.thrift                      |   2 +-
 common/thrift/StatestoreService.thrift             |   2 +-
 common/thrift/metrics.json                         |  11 +
 .../java/org/apache/impala/service/Frontend.java   |   3 +-
 tests/custom_cluster/test_coordinators.py          |  10 +-
 tests/custom_cluster/test_restart_services.py      |   4 +-
 50 files changed, 1969 insertions(+), 1042 deletions(-)

diff --git a/be/src/benchmarks/scheduler-benchmark.cc b/be/src/benchmarks/scheduler-benchmark.cc
index ef7fb9e..f7df4ef 100644
--- a/be/src/benchmarks/scheduler-benchmark.cc
+++ b/be/src/benchmarks/scheduler-benchmark.cc
@@ -20,7 +20,6 @@
 #include <vector>
 
 #include "gutil/strings/substitute.h"
-#include "scheduling/scheduler.h"
 #include "scheduling/scheduler-test-util.h"
 #include "util/benchmark.h"
 #include "util/cpu-info.h"
diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index 2e08613..dc926e3 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -46,7 +46,7 @@
   #include <gflags/gflags.h>
 #endif
 
-/// Define verbose logging levels.  Per-row logging is more verbase than per-file /
+/// Define verbose logging levels.  Per-row logging is more verbose than per-file /
 /// per-rpc logging which is more verbose than per-connection / per-query logging.
 #define VLOG_CONNECTION VLOG(1)
 #define VLOG_RPC        VLOG(2)
diff --git a/be/src/common/thread-debug-info-test.cc b/be/src/common/thread-debug-info-test.cc
index 09cd116..616aa0a 100644
--- a/be/src/common/thread-debug-info-test.cc
+++ b/be/src/common/thread-debug-info-test.cc
@@ -19,6 +19,7 @@
 
 #include "common/thread-debug-info.h"
 #include "testutil/gtest-util.h"
+#include "util/container-util.h"
 #include "util/thread.h"
 
 #include "common/names.h"
diff --git a/be/src/experiments/hash-ring-util.cc b/be/src/experiments/hash-ring-util.cc
index 1910755..e866f10 100644
--- a/be/src/experiments/hash-ring-util.cc
+++ b/be/src/experiments/hash-ring-util.cc
@@ -22,6 +22,7 @@
 
 #include "common/init.h"
 #include "scheduling/hash-ring.h"
+#include "scheduling/cluster-membership-test-util.h"
 #include "scheduling/scheduler-test-util.h"
 #include "testutil/gtest-util.h"
 #include "util/network-util.h"
@@ -53,7 +54,7 @@ public:
     cluster.AddHosts(num_hosts, true, false);
     HashRing hashring(num_replicas);
     for (int host_idx = 0; host_idx < num_hosts; host_idx++) {
-      IpAddr node = test::Cluster::HostIdxToIpAddr(host_idx);
+      IpAddr node = test::HostIdxToIpAddr(host_idx);
       hashring.AddNode(node);
     }
     int64_t end_nanos = MonotonicNanos();
diff --git a/be/src/gutil/strings/split.cc b/be/src/gutil/strings/split.cc
index e888856..3439004 100644
--- a/be/src/gutil/strings/split.cc
+++ b/be/src/gutil/strings/split.cc
@@ -13,6 +13,9 @@ using std::iterator_traits;
 #include <limits>
 using std::numeric_limits;
 
+using std::unordered_map;
+using std::unordered_set;
+
 #include "gutil/integral_types.h"
 #include <common/logging.h>
 #include "gutil/logging-inl.h"
diff --git a/be/src/gutil/strings/split.h b/be/src/gutil/strings/split.h
index 964e3fc..6e48db7 100644
--- a/be/src/gutil/strings/split.h
+++ b/be/src/gutil/strings/split.h
@@ -45,9 +45,7 @@ using std::pair;
 #include <vector>
 using std::vector;
 #include <unordered_map>
-using std::unordered_map;
 #include <unordered_set>
-using std::unordered_set;
 
 #include <common/logging.h>
 
@@ -677,7 +675,7 @@ void SplitStringPieceToVector(const StringPiece& full,
 void SplitStringUsing(const string& full, const char* delimiters,
                       vector<string>* result);
 void SplitStringToHashsetUsing(const string& full, const char* delimiters,
-                               unordered_set<string>* result);
+                               std::unordered_set<string>* result);
 void SplitStringToSetUsing(const string& full, const char* delimiters,
                            set<string>* result);
 // The even-positioned (0-based) components become the keys for the
@@ -688,7 +686,7 @@ void SplitStringToSetUsing(const string& full, const char* delimiters,
 void SplitStringToMapUsing(const string& full, const char* delim,
                            map<string, string>* result);
 void SplitStringToHashmapUsing(const string& full, const char* delim,
-                               unordered_map<string, string>* result);
+                               std::unordered_map<string, string>* result);
 
 // ----------------------------------------------------------------------
 // SplitStringAllowEmpty()
@@ -742,7 +740,7 @@ void SplitStringWithEscapingToSet(const string& full,
                                   set<string>* result);
 void SplitStringWithEscapingToHashset(const string& full,
                                       const strings::CharSet& delimiters,
-                                      unordered_set<string>* result);
+                                      std::unordered_set<string>* result);
 
 // ----------------------------------------------------------------------
 // SplitStringIntoNPiecesAllowEmpty()
diff --git a/be/src/rpc/thrift-util-test.cc b/be/src/rpc/thrift-util-test.cc
index a546f2b..5adba29 100644
--- a/be/src/rpc/thrift-util-test.cc
+++ b/be/src/rpc/thrift-util-test.cc
@@ -21,6 +21,7 @@
 
 #include "rpc/thrift-util.h"
 #include "testutil/gtest-util.h"
+#include "util/container-util.h"
 #include "util/network-util.h"
 
 #include "gen-cpp/RuntimeProfile_types.h"
@@ -62,7 +63,7 @@ TEST(ThriftUtil, SimpleSerializeDeserialize) {
     TCounter deserialized_counter;
     EXPECT_OK(DeserializeThriftMsg(buffer1, &len1, compact, &deserialized_counter));
     EXPECT_EQ(len1, len2);
-    EXPECT_TRUE(counter == deserialized_counter);
+    EXPECT_EQ(counter, deserialized_counter);
 
     // Serialize to string
     std::string str;
@@ -73,7 +74,7 @@ TEST(ThriftUtil, SimpleSerializeDeserialize) {
     TCounter deserialized_counter_2;
     EXPECT_OK(DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(str.data()), &len2,
         compact, &deserialized_counter_2));
-    EXPECT_TRUE(counter == deserialized_counter_2);
+    EXPECT_EQ(counter, deserialized_counter_2);
   }
 }
 
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 6e3ac7c..c0d414b 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -67,14 +67,6 @@ static_assert(PACKAGE_VERSION[3] == '.', "");
 static_assert(PACKAGE_VERSION[4] == '3', "");
 static_assert(PACKAGE_VERSION[5] == '\0', "");
 
-// Thrift defines operator< but does not implement it. This is a stub
-// implementation so we can link.
-bool Apache::Hadoop::Hive::Partition::operator<(
-    const Apache::Hadoop::Hive::Partition& x) const {
-  DCHECK(false) << "This should not get called.";
-  return false;
-}
-
 namespace impala {
 
 ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size) :
@@ -99,26 +91,6 @@ boost::shared_ptr<TProtocol> CreateDeserializeProtocol(
   }
 }
 
-// Comparator for THostPorts. Thrift declares this (in gen-cpp/Types_types.h) but
-// never defines it.
-bool TNetworkAddress::operator<(const TNetworkAddress& that) const {
-  if (this->hostname < that.hostname) {
-    return true;
-  } else if ((this->hostname == that.hostname) && (this->port < that.port)) {
-    return true;
-  }
-  return false;
-};
-
-// Comparator for TUniqueIds
-bool TUniqueId::operator<(const TUniqueId& that) const {
-  return (hi < that.hi) || (hi == that.hi &&  lo < that.lo);
-}
-
-bool TAccessEvent::operator<(const TAccessEvent& that) const {
-  return this->name < that.name;
-}
-
 static void ThriftOutputFunction(const char* output) {
   VLOG_QUERY << output;
 }
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 37d6b37..1c1b659 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -45,6 +45,7 @@
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tmp-file-mgr.h"
 #include "scheduling/admission-controller.h"
+#include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/request-pool-service.h"
 #include "scheduling/scheduler.h"
 #include "service/control-service.h"
@@ -196,13 +197,15 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
       Substitute("impalad@$0", TNetworkAddressToString(configured_backend_address_)),
       subscriber_address, statestore_address, metrics_.get()));
 
+  cluster_membership_mgr_.reset(new ClusterMembershipMgr(statestore_subscriber_->id(),
+      statestore_subscriber_.get()));
+
   if (FLAGS_is_coordinator) {
     hdfs_op_thread_pool_.reset(
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024));
     exec_rpc_thread_pool_.reset(new CallableThreadPool("exec-rpc-pool", "worker",
         FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max()));
-    scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
-        statestore_subscriber_->id(), metrics_.get(), webserver_.get(),
+    scheduler_.reset(new Scheduler(cluster_membership_mgr_.get(), metrics_.get(),
         request_pool_service_.get()));
   }
 
@@ -336,10 +339,12 @@ Status ExecEnv::Init() {
     LOG(INFO) << "Not starting webserver";
   }
 
-  if (scheduler_ != nullptr) {
-    RETURN_IF_ERROR(scheduler_->Init(
-          configured_backend_address_, krpc_address_, ip_address_, admit_mem_limit_));
-  }
+  RETURN_IF_ERROR(cluster_membership_mgr_->Init());
+  cluster_membership_mgr_->SetUpdateFrontendFn(
+      [this](const TUpdateExecutorMembershipRequest& update_req) {
+        return this->frontend()->UpdateExecutorMembership(update_req);
+  });
+
   RETURN_IF_ERROR(admission_controller_->Init());
 
   // Get the fs.defaultFS value set in core-site.xml and assign it to configured_defaultFs
@@ -445,7 +450,16 @@ Status ExecEnv::StartKrpcService() {
 
 void ExecEnv::SetImpalaServer(ImpalaServer* server) {
   DCHECK(impala_server_ == nullptr) << "ImpalaServer already registered";
+  DCHECK(server != nullptr);
   impala_server_ = server;
+  // Register the ImpalaServer with the cluster membership manager
+  cluster_membership_mgr_->SetLocalBeDescFn([server]() {
+    return server->GetLocalBackendDescriptor();
+  });
+  cluster_membership_mgr_->SetUpdateLocalServerFn(
+      [server](const ClusterMembershipMgr::BackendAddressSet& current_backends) {
+        server->CancelQueriesOnFailedBackends(current_backends);
+  });
 }
 
 void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index c9cbe74..6601fad 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -41,6 +41,7 @@ namespace impala {
 class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
+class ClusterMembershipMgr;
 class ControlService;
 class DataStreamMgr;
 class DataStreamService;
@@ -143,6 +144,7 @@ class ExecEnv {
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
+  ClusterMembershipMgr* cluster_membership_mgr() { return cluster_membership_mgr_.get(); }
   Scheduler* scheduler() { return scheduler_.get(); }
   AdmissionController* admission_controller() { return admission_controller_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
@@ -175,6 +177,7 @@ class ExecEnv {
   boost::scoped_ptr<ObjectPool> obj_pool_;
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<KrpcDataStreamMgr> stream_mgr_;
+  boost::scoped_ptr<ClusterMembershipMgr> cluster_membership_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 716313a..0ac19b5 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -33,6 +33,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "service/data-stream-service.h"
+#include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 3502ae0..1f8cc18 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -29,6 +29,7 @@
 #include "gen-cpp/Types_types.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/tmp-file-mgr.h"
+#include "util/container-util.h"
 #include "util/counting-barrier.h"
 #include "util/uid-util.h"
 
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index a1ef269..135585d 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -25,7 +25,9 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling")
 # TODO: Move other scheduling-related classes here
 add_library(Scheduling STATIC
   admission-controller.cc
-  backend-config.cc
+  cluster-membership-mgr.cc
+  cluster-membership-test-util.cc
+  executor-group.cc
   hash-ring.cc
   query-schedule.cc
   request-pool-service.cc
@@ -34,7 +36,8 @@ add_library(Scheduling STATIC
 )
 add_dependencies(Scheduling gen-deps)
 
-ADD_BE_LSAN_TEST(scheduler-test)
-ADD_BE_LSAN_TEST(backend-config-test)
-ADD_BE_LSAN_TEST(hash-ring-test)
 ADD_BE_LSAN_TEST(admission-controller-test)
+ADD_BE_LSAN_TEST(cluster-membership-mgr-test)
+ADD_BE_LSAN_TEST(executor-group-test)
+ADD_BE_LSAN_TEST(hash-ring-test)
+ADD_BE_LSAN_TEST(scheduler-test)
diff --git a/be/src/scheduling/backend-config-test.cc b/be/src/scheduling/backend-config-test.cc
deleted file mode 100644
index 9da7660..0000000
--- a/be/src/scheduling/backend-config-test.cc
+++ /dev/null
@@ -1,93 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "scheduling/backend-config.h"
-
-#include "common/logging.h"
-#include "common/names.h"
-#include "testutil/gtest-util.h"
-#include "util/network-util.h"
-#include "util/thread.h"
-
-namespace impala {
-
-/// Test that BackendConfig can be created from a vector of backends.
-TEST(BackendConfigTest, MakeFromBackendVector) {
-  // This address needs to be resolvable using getaddrinfo().
-  vector<TNetworkAddress> backends {MakeNetworkAddress("localhost", 1001)};
-  BackendConfig backend_config(backends);
-  IpAddr backend_ip;
-  bool ret = backend_config.LookUpBackendIp(backends[0].hostname, &backend_ip);
-  ASSERT_TRUE(ret);
-  EXPECT_EQ("127.0.0.1", backend_ip);
-}
-
-/// Test adding multiple backends on different hosts.
-TEST(BackendConfigTest, AddBackends) {
-  BackendConfig backend_config;
-  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
-  backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
-  ASSERT_EQ(2, backend_config.NumBackends());
-  IpAddr backend_ip;
-  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
-  EXPECT_EQ("10.0.0.1", backend_ip);
-  ASSERT_TRUE(backend_config.LookUpBackendIp("host_2", &backend_ip));
-  EXPECT_EQ("10.0.0.2", backend_ip);
-}
-
-/// Test adding multiple backends on the same host.
-TEST(BackendConfigTest, MultipleBackendsOnSameHost) {
-  BackendConfig backend_config;
-  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
-  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
-  IpAddr backend_ip;
-  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
-  EXPECT_EQ("10.0.0.1", backend_ip);
-  const BackendConfig::BackendList& backend_list =
-      backend_config.GetBackendListForHost("10.0.0.1");
-  EXPECT_EQ(2, backend_list.size());
-}
-
-/// Test removing a backend.
-TEST(BackendConfigTest, RemoveBackend) {
-  BackendConfig backend_config;
-  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
-  backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
-  backend_config.RemoveBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
-  IpAddr backend_ip;
-  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
-  EXPECT_EQ("10.0.0.1", backend_ip);
-  ASSERT_FALSE(backend_config.LookUpBackendIp("host_2", &backend_ip));
-}
-
-/// Test removing one of multiple backends on the same host (IMPALA-3944).
-TEST(BackendConfigTest, RemoveBackendOnSameHost) {
-  BackendConfig backend_config;
-  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
-  backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
-  backend_config.RemoveBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
-  IpAddr backend_ip;
-  ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
-  EXPECT_EQ("10.0.0.1", backend_ip);
-  const BackendConfig::BackendList& backend_list =
-      backend_config.GetBackendListForHost("10.0.0.1");
-  EXPECT_EQ(1, backend_list.size());
-}
-
-}  // end namespace impala
-
-IMPALA_TEST_MAIN();
diff --git a/be/src/scheduling/backend-config.cc b/be/src/scheduling/backend-config.cc
deleted file mode 100644
index f4d68bd..0000000
--- a/be/src/scheduling/backend-config.cc
+++ /dev/null
@@ -1,116 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "scheduling/backend-config.h"
-
-namespace impala{
-
-// Hand-testing shows that 25 replicas produces a reasonable balance between nodes
-// across the hash ring. See HashRingTest::MaxMinRatio() for some empirical results
-// at similar replication levels. There is nothing special about 25 (i.e. 24 or 26
-// would be similar). Increasing this results in a more even distribution.
-// TODO: This can be tuned further with real world tests
-static const uint32_t NUM_HASH_RING_REPLICAS = 25;
-
-BackendConfig::BackendConfig()
-  : backend_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {}
-
-BackendConfig::BackendConfig(const std::vector<TNetworkAddress>& backends)
-  : backend_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {
-  // Construct backend_map and backend_ip_map.
-  for (const TNetworkAddress& backend: backends) {
-    IpAddr ip;
-    Status status = HostnameToIpAddr(backend.hostname, &ip);
-    if (!status.ok()) {
-      VLOG(1) << status.GetDetail();
-      continue;
-    }
-    AddBackend(MakeBackendDescriptor(backend.hostname, ip, backend.port));
-  }
-}
-
-const BackendConfig::BackendList& BackendConfig::GetBackendListForHost(
-    const IpAddr& ip) const {
-  BackendMap::const_iterator it = backend_map_.find(ip);
-  DCHECK(it != backend_map_.end());
-  return it->second;
-}
-
-void BackendConfig::GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const {
-  ip_addresses->reserve(NumBackends());
-  for (auto& it: backend_map_) ip_addresses->push_back(it.first);
-}
-
-void BackendConfig::GetAllBackends(BackendList* backends) const {
-  for (const auto& backend_list: backend_map_) {
-    backends->insert(backends->end(), backend_list.second.begin(),
-        backend_list.second.end());
-  }
-}
-
-void BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
-  DCHECK(!be_desc.ip_address.empty());
-  BackendList& be_descs = backend_map_[be_desc.ip_address];
-  if (be_descs.empty()) {
-    backend_ip_hash_ring_.AddNode(be_desc.ip_address);
-  }
-  if (find(be_descs.begin(), be_descs.end(), be_desc) == be_descs.end()) {
-    be_descs.push_back(be_desc);
-  }
-  backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
-}
-
-void BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
-  auto be_descs_it = backend_map_.find(be_desc.ip_address);
-  if (be_descs_it != backend_map_.end()) {
-    BackendList* be_descs = &be_descs_it->second;
-    be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
-    if (be_descs->empty()) {
-      backend_map_.erase(be_descs_it);
-      backend_ip_map_.erase(be_desc.address.hostname);
-      backend_ip_hash_ring_.RemoveNode(be_desc.ip_address);
-    }
-  }
-}
-
-bool BackendConfig::LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const {
-  // Check if hostname is already a valid IP address.
-  if (backend_map_.find(hostname) != backend_map_.end()) {
-    if (ip != nullptr) *ip = hostname;
-    return true;
-  }
-  auto it = backend_ip_map_.find(hostname);
-  if (it != backend_ip_map_.end()) {
-    if (ip != nullptr) *ip = it->second;
-    return true;
-  }
-  return false;
-}
-
-const TBackendDescriptor* BackendConfig::LookUpBackendDesc(
-    const TNetworkAddress& host) const {
-  IpAddr ip;
-  if (LIKELY(LookUpBackendIp(host.hostname, &ip))) {
-    const BackendConfig::BackendList& be_list = GetBackendListForHost(ip);
-    for (const TBackendDescriptor& desc : be_list) {
-      if (desc.address == host) return &desc;
-    }
-  }
-  return nullptr;
-}
-
-}  // end ns impala
diff --git a/be/src/scheduling/backend-config.h b/be/src/scheduling/backend-config.h
deleted file mode 100644
index 32b7c2f..0000000
--- a/be/src/scheduling/backend-config.h
+++ /dev/null
@@ -1,87 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef SCHEDULING_BACKEND_CONFIG_H
-#define SCHEDULING_BACKEND_CONFIG_H
-
-#include <vector>
-
-#include <boost/unordered_map.hpp>
-
-#include "gen-cpp/StatestoreService_types.h"
-#include "gen-cpp/Types_types.h"
-#include "scheduling/hash-ring.h"
-#include "util/network-util.h"
-
-namespace impala {
-
-/// Configuration class to store a list of backends per IP address, a mapping from
-/// hostnames to IP addresses, and a hash ring containing all backends.
-class BackendConfig {
- public:
-  BackendConfig();
-
-  /// Construct from list of backends.
-  BackendConfig(const std::vector<TNetworkAddress>& backends);
-
-  /// List of Backends.
-  typedef std::list<TBackendDescriptor> BackendList;
-
-  /// Return the list of backends on a particular host. The caller must make sure that the
-  /// host is actually contained in backend_map_.
-  const BackendList& GetBackendListForHost(const IpAddr& ip) const;
-
-  void GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const;
-  void GetAllBackends(BackendList* backends) const;
-  void AddBackend(const TBackendDescriptor& be_desc);
-  void RemoveBackend(const TBackendDescriptor& be_desc);
-
-  /// Look up the IP address of 'hostname' in the internal backend maps and return
-  /// whether the lookup was successful. If 'hostname' itself is a valid IP address and
-  /// is contained in backend_map_, then it is copied to 'ip' and true is returned. 'ip'
-  /// can be nullptr if the caller only wants to check whether the lookup succeeds. Use
-  /// this method to resolve datanode hostnames to IP addresses during scheduling, to
-  /// prevent blocking on the OS.
-  bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
-
-  /// Look up the backend descriptor for the backend with hostname 'host'.
-  /// Returns nullptr if it's not found. The returned descriptor should not
-  /// be retained beyond the lifetime of this BackendConfig.
-  const TBackendDescriptor* LookUpBackendDesc(const TNetworkAddress& host) const;
-
-  const HashRing* GetHashRing() const { return &backend_ip_hash_ring_; }
-
-  int NumBackends() const { return backend_map_.size(); }
-
- private:
-  /// Map from a host's IP address to a list of backends running on that node.
-  typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
-  BackendMap backend_map_;
-
-  /// Map from a hostname to its IP address to support hostname based backend lookup. It
-  /// contains entries for all backends in backend_map_ and needs to be updated whenever
-  /// backend_map_ changes.
-  typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
-  BackendIpAddressMap backend_ip_map_;
-
-  /// All backends are kept in a hash ring to allow a consistent mapping from filenames
-  /// to backends.
-  HashRing backend_ip_hash_ring_;
-};
-
-}  // end ns impala
-#endif
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
new file mode 100644
index 0000000..91e780e
--- /dev/null
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <deque>
+
+#include "common/logging.h"
+#include "common/names.h"
+#include "gen-cpp/StatestoreService_types.h"
+#include "scheduling/cluster-membership-mgr.h"
+#include "scheduling/cluster-membership-test-util.h"
+#include "testutil/gtest-util.h"
+#include "testutil/rand-util.h"
+
+using std::mt19937;
+using std::uniform_int_distribution;
+using std::uniform_real_distribution;
+using namespace impala;
+using namespace impala::test;
+
+namespace impala {
+
+/// This class and the following tests exercise the ClusterMembershipMgr core membership
+/// handling code by simulating the interactions between multiple ClusterMembershipMgr
+/// instances through the statestore. Updates between all cluster members are sent and
+/// process sequentially and in a deterministic order.
+///
+/// The tests progress in 4 subsequently more sophisticated ways:
+/// 1) Make sure that simple interactions between 2 backends and their
+///    ClusterMembershipMgr work correctly.
+/// 2) Run a cluster of backends through the regular lifecycle of a backend in lockstep.
+/// 3) Make random but valid changes to the membership of a whole cluster for a given
+///    number of iterations and observe that each member's state remains consistent.
+/// 4) TODO: Make random, potentially invalid changes to the membership of a whole
+///    cluster.
+class ClusterMembershipMgrTest : public testing::Test {
+ public:
+  virtual void SetUp() {
+    RandTestUtil::SeedRng("CLUSTER_MEMBERSHIP_MGR_TEST_SEED", &rng_);
+  }
+
+ protected:
+  ClusterMembershipMgrTest() {}
+
+  /// A struct to hold information related to a simulated backend during the test.
+  struct Backend {
+    string backend_id;
+    std::unique_ptr<ClusterMembershipMgr> cmm;
+    std::shared_ptr<TBackendDescriptor> desc;
+  };
+  /// The list of backends_ that owns all backends.
+  vector<unique_ptr<Backend>> backends_;
+
+  /// Various lists of pointers that point into elements of 'backends_'. These pointers
+  /// will stay valid when backend_ resizes. Backends can be in one of 5 states:
+  /// - "Offline": A TBackendDescriptor has been created but has not been associated with
+  ///     a ClusterMembershipMgr.
+  /// - "Starting": A ClusterMembershipMgr has been created, but the associated backend
+  ///     descriptor is not running yet (no callback registered with the
+  ///     ClusterMembershipMgr).
+  /// - "Running": The backend descriptor is available to the ClusterMembershipMgr via a
+  ///     callback.
+  /// - "Quiescing": The backend descriptor is marked as quiescing.
+  /// - "Deleted": The backend has been deleted from 'backends_' altogether and other
+  ///     backends have received statestore messages to notify them of the deletion. Note
+  ///     that transition into this state does not update the affected backend itself, it
+  ///     merely gets destructed.
+  ///
+  /// As part of the state transitions, all other backends are notified of the new state
+  /// by calling their UpdateMembership() methods with a matching statestore update
+  /// message.
+  ///
+  /// We keep backends in double ended queues to allow for efficient removal of elements
+  /// on both ends.
+  typedef deque<Backend*> Backends;
+
+  bool IsInVector(const Backend* be, const Backends& backends) {
+    return find(backends.begin(), backends.end(), be) != backends.end();
+  }
+
+  void RemoveFromVector(const Backend* be, Backends* backends) {
+    auto it = find(backends->begin(), backends->end(), be);
+    ASSERT_TRUE(it != backends->end());
+    backends->erase(it);
+    it = find(backends->begin(), backends->end(), be);
+    ASSERT_TRUE(it == backends->end());
+  }
+
+  /// Lists that are used to track backends in a particular state. Methods that manipulate
+  /// these lists maintain the invariant that a backend is only in one list at a time.
+
+  /// Backends that are in state "Offline".
+  Backends offline_;
+
+  /// Backends that are in state "Starting".
+  Backends starting_;
+
+  /// Backends that are in state "Running".
+  Backends running_;
+
+  /// Backends that are in state "Quiescing".
+  Backends quiescing_;
+
+  typedef vector<TTopicDelta> TopicDeltas;
+
+  /// Polls a backend for changes to its local state by sending an empty statestore update
+  /// that is marked as a delta (i.e. it does not represent any changes).
+  vector<TTopicDelta> Poll(Backend* be) {
+    const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
+    // The empty delta is used to poll subscribers for updates without sending new
+    // changes.
+    StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
+    TTopicDelta& empty_delta = topic_delta_map[topic_id];
+    empty_delta.is_delta = true;
+    vector<TTopicDelta> returned_topic_deltas;
+    be->cmm->UpdateMembership(topic_delta_map, &returned_topic_deltas);
+    return returned_topic_deltas;
+  }
+
+  /// Sends a single topic item in a delta to all backends in 'backend_'.
+  void SendDelta(const TTopicItem& item) {
+    const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
+    StatestoreSubscriber::TopicDeltaMap topic_delta_map;
+    TTopicDelta& delta = topic_delta_map[topic_id];
+    delta.topic_entries.push_back(item);
+    delta.is_delta = true;
+    for (auto& backend : backends_) {
+      vector<TTopicDelta> returned_topic_deltas;
+      backend->cmm->UpdateMembership(topic_delta_map, &returned_topic_deltas);
+      // We never expect backends to respond with a delta update on their own because the
+      // test code explicitly polls them after making changes to their state.
+      ASSERT_EQ(0, returned_topic_deltas.size())
+        << "Error with backend " << backend->backend_id;
+    }
+  }
+
+  /// Creates a new backend and adds it to the list of offline backends. If idx is
+  /// omitted, the current number of backends will be used as the new index.
+  Backend* CreateBackend(int idx = -1) {
+    if (idx == -1) idx = backends_.size();
+    backends_.push_back(make_unique<Backend>());
+    auto& be = backends_.back();
+    be->desc = make_shared<TBackendDescriptor>(MakeBackendDescriptor(idx));
+    be->backend_id = be->desc->address.hostname;
+    offline_.push_back(be.get());
+    return be.get();
+  }
+
+  /// Creates a new ClusterMembershipMgr for a backend and moves the backend from
+  /// 'offline_' to 'starting_'. Callers must handle invalidated iterators after calling
+  /// this method.
+  void CreateCMM(Backend* be) {
+    ASSERT_TRUE(IsInVector(be, offline_));
+    be->cmm = make_unique<ClusterMembershipMgr>(be->backend_id, nullptr);
+    RemoveFromVector(be, &offline_);
+    starting_.push_back(be);
+  }
+
+  /// Starts a backend by making its backend descriptor available to its
+  /// ClusterMembershipMgr through a callback. This method also propagates the change to
+  /// all other backends in 'backends_' and moves the backend from 'starting_' to
+  /// 'running_'.
+  void StartBackend(Backend* be) {
+    ASSERT_TRUE(IsInVector(be, starting_));
+    ASSERT_TRUE(be->cmm.get() != nullptr);
+    ASSERT_TRUE(be->desc.get() != nullptr);
+    auto be_cb = [be]() { return be->desc; };
+    be->cmm->SetLocalBeDescFn(be_cb);
+
+    // Poll to obtain topic update
+    TopicDeltas topic_deltas = Poll(be);
+    ASSERT_EQ(1, topic_deltas.size());
+    ASSERT_EQ(1, topic_deltas[0].topic_entries.size());
+
+    // Broadcast to all other backends
+    SendDelta(topic_deltas[0].topic_entries[0]);
+
+    RemoveFromVector(be, &starting_);
+    running_.push_back(be);
+  }
+
+  /// Quiesces a backend by updating its backend descriptor and polling its
+  /// ClusterMembershipMgr to make the change take effect. The resulting update from the
+  /// backend's ClusterMembershipMgr is then broadcast to the rest of the cluster. Also
+  /// moves the backend from 'running_' to 'quiescing_'.
+  void QuiesceBackend(Backend* be) {
+    ASSERT_TRUE(IsInVector(be, running_));
+    be->desc->__set_is_quiescing(true);
+    TopicDeltas topic_deltas = Poll(be);
+    ASSERT_EQ(1, topic_deltas.size());
+    ASSERT_EQ(1, topic_deltas[0].topic_entries.size());
+
+    // Broadcast to all other backends
+    SendDelta(topic_deltas[0].topic_entries[0]);
+
+    RemoveFromVector(be, &running_);
+    quiescing_.push_back(be);
+  }
+
+  /// Deletes a backend from all other backends and from 'backends_'. A delta marking the
+  /// backend as deleted gets sent to all nodes in 'backends_'. Note that this method does
+  /// not send any updates to the backend itself, as it would - like in a real cluster -
+  /// just disappear. The backend must be in 'running_' or 'quiescing_' and is removed
+  /// from the respective list by this method.
+  void DeleteBackend(Backend* be) {
+    bool is_running = IsInVector(be, running_);
+    bool is_quiescing = IsInVector(be, quiescing_);
+    ASSERT_TRUE(is_running || is_quiescing);
+
+    // Create topic item before erasing the backend.
+    TTopicItem deletion_item;
+    deletion_item.key = be->backend_id;
+    deletion_item.deleted = true;
+
+    // Delete the backend
+    auto new_end_it = std::remove_if(backends_.begin(), backends_.end(),
+        [be](const unique_ptr<Backend>& elem) { return elem.get() == be; });
+    backends_.erase(new_end_it, backends_.end());
+
+    // Create deletion update
+    SendDelta(deletion_item);
+
+    if (is_running) RemoveFromVector(be, &running_);
+    if (is_quiescing) RemoveFromVector(be, &quiescing_);
+  }
+
+  mt19937 rng_;
+
+  int RandomInt(int max) {
+    uniform_int_distribution<int> rand_int(0, max - 1);
+    return rand_int(rng_);
+  }
+
+  double RandomDoubleFraction() {
+    uniform_real_distribution<double> rand_double(0, 1);
+    return rand_double(rng_);
+  }
+};
+
+/// This test takes two instances of the ClusterMembershipMgr through a common lifecycle.
+/// It also serves as an example for how to craft statestore messages and pass them to
+/// UpdaUpdateMembership().
+TEST_F(ClusterMembershipMgrTest, TwoInstances) {
+  auto b1 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(1));
+  auto b2 = make_shared<TBackendDescriptor>(MakeBackendDescriptor(2));
+
+  ClusterMembershipMgr cmm1(b1->address.hostname, nullptr);
+  ClusterMembershipMgr cmm2(b2->address.hostname, nullptr);
+
+  const Statestore::TopicId topic_id = Statestore::IMPALA_MEMBERSHIP_TOPIC;
+  StatestoreSubscriber::TopicDeltaMap topic_delta_map = {{topic_id, TTopicDelta()}};
+  TTopicDelta* ss_topic_delta = &topic_delta_map[topic_id];
+  vector<TTopicDelta> returned_topic_deltas;
+  // The empty delta is used to poll subscribers for updates without sending new changes.
+  TTopicDelta empty_delta;
+  empty_delta.is_delta = true;
+
+  // Ping both managers, both should have no state to update
+  cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(0, returned_topic_deltas.size());
+
+  cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(0, returned_topic_deltas.size());
+
+  // Hook up first callback and iterate again
+  cmm1.SetLocalBeDescFn([b1]() { return b1; });
+  cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(1, returned_topic_deltas.size());
+
+  // First manager now has one BE
+  ASSERT_EQ(1, cmm1.GetSnapshot()->current_backends.size());
+
+  // Hook up second callback and iterate with the result of the first manager
+  cmm2.SetLocalBeDescFn([b2]() { return b2; });
+  *ss_topic_delta = returned_topic_deltas[0];
+  returned_topic_deltas.clear();
+  cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(1, returned_topic_deltas.size());
+  ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
+
+  // Send the returned update to the first manager, this time no deltas will be returned
+  *ss_topic_delta = returned_topic_deltas[0];
+  returned_topic_deltas.clear();
+  cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(0, returned_topic_deltas.size());
+  ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
+
+  // Both managers now have the same state. Shutdown one of them and step through
+  // propagating the update.
+  b1->is_quiescing = true;
+  // Send an empty update to the 1st one to trigger propagation of the shutdown
+  returned_topic_deltas.clear();
+  topic_delta_map[topic_id] = empty_delta;
+  cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  // The mgr will return its changed TBackendDescriptor
+  ASSERT_EQ(1, returned_topic_deltas.size());
+  // It will also remove itself from the executor group (but not the current backends).
+  ASSERT_EQ(1,
+      cmm1.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
+
+  // Propagate the quiescing to the 2nd mgr
+  *ss_topic_delta = returned_topic_deltas[0];
+  returned_topic_deltas.clear();
+  ASSERT_EQ(2,
+      cmm2.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(0, returned_topic_deltas.size());
+  ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
+  ASSERT_EQ(1,
+      cmm2.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+
+  // Delete the 1st backend from the 2nd one
+  ASSERT_EQ(1, ss_topic_delta->topic_entries.size());
+  ss_topic_delta->topic_entries[0].deleted = true;
+  cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
+  ASSERT_EQ(0, returned_topic_deltas.size());
+  ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
+  ASSERT_EQ(1,
+      cmm2.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+
+}
+
+// This test runs a group of 20 backends through their full lifecycle, validating that
+// their state is correctly propagated through the cluster after every change.
+TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
+  const int NUM_BACKENDS = 20;
+  for (int i = 0; i < NUM_BACKENDS; ++i) {
+    CreateBackend();
+  }
+  EXPECT_EQ(NUM_BACKENDS, backends_.size());
+  EXPECT_EQ(backends_.size(), offline_.size());
+
+  while (!offline_.empty()) CreateCMM(offline_.front());
+  ASSERT_EQ(0, offline_.size());
+  ASSERT_EQ(NUM_BACKENDS, starting_.size());
+
+  while (!starting_.empty()) StartBackend(starting_.front());
+  ASSERT_EQ(0, starting_.size());
+  ASSERT_EQ(NUM_BACKENDS, running_.size());
+
+  // Assert that all backends know about each other and are all in the default executor
+  // group.
+  for (Backend* be : running_) {
+    EXPECT_EQ(running_.size(), be->cmm->GetSnapshot()->current_backends.size());
+    EXPECT_EQ(running_.size(),
+        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  }
+
+  // Quiesce half of the backends.
+  for (int i = 0; quiescing_.size() < NUM_BACKENDS / 2; ++i) {
+    Backend* be = running_.front();
+    // All backends must still remain online
+    EXPECT_EQ(NUM_BACKENDS, be->cmm->GetSnapshot()->current_backends.size());
+
+    EXPECT_EQ(NUM_BACKENDS - i,
+        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+    QuiesceBackend(be);
+    // Make sure that the numbers drop
+    EXPECT_EQ(NUM_BACKENDS - i - 1,
+        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  }
+  int num_still_running = NUM_BACKENDS - quiescing_.size();
+  ASSERT_EQ(num_still_running, running_.size());
+  ASSERT_EQ(NUM_BACKENDS / 2, quiescing_.size());
+
+  for (auto& be : backends_) {
+    // All backends are still registered
+    EXPECT_EQ(backends_.size(), be->cmm->GetSnapshot()->current_backends.size());
+    // Executor groups now show half of the backends remaining
+    EXPECT_EQ(num_still_running,
+        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  }
+
+  // Delete half of the backends and make sure that the other half learned about it.
+  int to_delete = backends_.size() / 2;
+  int num_expected_alive = backends_.size() - to_delete;
+  for (int idx = 0; idx < to_delete; ++idx) {
+    // Will change backends_
+    if (idx % 2 == 0) {
+      DeleteBackend(running_.front());
+    } else {
+      DeleteBackend(quiescing_.front());
+    }
+  }
+  ASSERT_EQ(num_expected_alive, quiescing_.size() + running_.size());
+
+  for (Backend* be : quiescing_) {
+    EXPECT_EQ(num_expected_alive, be->cmm->GetSnapshot()->current_backends.size());
+  }
+
+  // Quiesce the remaining backends to validate that executor groups can scale to 0
+  // backends.
+  while (!running_.empty()) QuiesceBackend(running_.front());
+  for (auto& be : backends_) {
+    // Executor groups now are empty
+    EXPECT_EQ(0,
+        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  }
+}
+
+/// This test executes a number of random changes to cluster membership. On every
+/// iteration a new backend is created, and with some probability, a backend is quiesced,
+/// removed from the cluster after having been quiesced before, or removed from the
+/// cluster without quiescing. The test relies on the consistency checks built into the
+/// ClusterMembershipMgr to ensure that it is in a consistent state.
+TEST_F(ClusterMembershipMgrTest, RandomizedMembershipUpdates) {
+  // TODO: Parameterize this test and run with several parameter sets
+  const int NUM_ITERATIONS = 100;
+  const double P_ADD = 1;
+  const double P_QUIESCE = 0.35;
+  const double P_DELETE = 0.30;
+  const double P_KILL = 0.2;
+
+  // Cumulative counts of how many backends were added/shutdown/deleted/killed by the
+  // tests.
+  int num_added = 0;
+  int num_shutdown = 0;
+  int num_deleted = 0;
+  // In this test "killing" a backend means deleting it without quiescing it first to
+  // simulate non-graceful failures.
+  int num_killed = 0;
+
+  for (int i = 0; i < NUM_ITERATIONS; ++i) {
+    double p = RandomDoubleFraction();
+    if (p < P_ADD) {
+      Backend* be = CreateBackend(i);
+      CreateCMM(be);
+      StartBackend(be);
+      ++num_added;
+    }
+    if (p < P_QUIESCE && !running_.empty()) {
+      int idx = RandomInt(running_.size());
+      Backend* be = running_[idx];
+      QuiesceBackend(be);
+      ++num_shutdown;
+    }
+    if (p < P_DELETE && !quiescing_.empty()) {
+      int idx = RandomInt(quiescing_.size());
+      Backend* be = quiescing_[idx];
+      DeleteBackend(be);
+      ++num_deleted;
+    }
+    if (p < P_KILL && !running_.empty()) {
+      int idx = RandomInt(running_.size());
+      Backend* be = running_[idx];
+      DeleteBackend(be);
+      ++num_killed;
+    }
+  }
+  std::cout << "Added: " << num_added << ", shutdown: " << num_shutdown << ", deleted: "
+      << num_deleted << ", killed: " << num_killed << endl;
+}
+
+/// TODO: Write a test that makes a number of random changes to cluster membership while
+/// not maintaining the proper lifecycle steps that a backend goes through (create, start,
+/// quiesce, delete).
+
+} // end namespace impala
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
new file mode 100644
index 0000000..ca8f20d
--- /dev/null
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -0,0 +1,401 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/cluster-membership-mgr.h"
+
+#include "common/logging.h"
+#include "common/names.h"
+#include "util/test-info.h"
+
+namespace impala {
+
+const string ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP = "default";
+static const vector<string> DEFAULT_EXECUTOR_GROUPS =
+    {ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP};
+
+ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
+    StatestoreSubscriber* subscriber) :
+    current_membership_(std::make_shared<const Snapshot>()),
+    statestore_subscriber_(subscriber),
+    thrift_serializer_(/* compact= */ false),
+    local_backend_id_(move(local_backend_id)) {
+}
+
+Status ClusterMembershipMgr::Init() {
+  LOG(INFO) << "Starting cluster membership manager";
+  if (statestore_subscriber_ == nullptr) {
+    DCHECK(TestInfo::is_test());
+    return Status::OK();
+  }
+  // Register with the statestore
+  StatestoreSubscriber::UpdateCallback cb =
+      bind<void>(mem_fn(&ClusterMembershipMgr::UpdateMembership), this, _1, _2);
+  Status status = statestore_subscriber_->AddTopic(
+      Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
+      /* populate_min_subscriber_topic_version=*/ false,
+      /* filter_prefix= */"", cb);
+  if (!status.ok()) {
+    status.AddDetail("Scheduler failed to register membership topic");
+    return status;
+  }
+  return Status::OK();
+}
+
+void ClusterMembershipMgr::SetLocalBeDescFn(BackendDescriptorPtrFn fn) {
+  lock_guard<mutex> l(callback_fn_lock_);
+  DCHECK(fn);
+  DCHECK(!local_be_desc_fn_);
+  local_be_desc_fn_ = std::move(fn);
+}
+
+void ClusterMembershipMgr::SetUpdateLocalServerFn(UpdateLocalServerFn fn) {
+  lock_guard<mutex> l(callback_fn_lock_);
+  DCHECK(fn);
+  DCHECK(!update_local_server_fn_);
+  update_local_server_fn_ = std::move(fn);
+}
+
+void ClusterMembershipMgr::SetUpdateFrontendFn(UpdateFrontendFn fn) {
+  lock_guard<mutex> l(callback_fn_lock_);
+  DCHECK(fn);
+  DCHECK(!update_frontend_fn_);
+  update_frontend_fn_ = std::move(fn);
+}
+
+ClusterMembershipMgr::SnapshotPtr ClusterMembershipMgr::GetSnapshot() const {
+  lock_guard<mutex> l(current_membership_lock_);
+  DCHECK(current_membership_.get() != nullptr);
+  SnapshotPtr state = current_membership_;
+  return state;
+}
+
+void ClusterMembershipMgr::UpdateMembership(
+    const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
+    vector<TTopicDelta>* subscriber_topic_updates) {
+  DFAKE_SCOPED_LOCK(update_membership_lock_);
+
+  // First look to see if the topic we're interested in has an update.
+  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
+
+  // Ignore spurious messages.
+  if (topic == incoming_topic_deltas.end()) return;
+  const TTopicDelta& update = topic->second;
+
+  // If the update transmitted by the statestore is an empty delta, we don't need to
+  // process it.
+  bool no_ss_update = update.is_delta && update.topic_entries.empty();
+
+  // Check if the local backend is up and needs updating.
+  const Snapshot* base_snapshot = recovering_membership_.get();
+  if (base_snapshot == nullptr) base_snapshot = current_membership_.get();
+  DCHECK(base_snapshot != nullptr);
+  BeDescSharedPtr local_be_desc = GetLocalBackendDescriptor();
+  bool needs_local_be_update = NeedsLocalBackendUpdate(*base_snapshot, local_be_desc);
+
+  // We consider the statestore to be recovering from a connection failure until its post
+  // recovery grace period has elapsed.
+  bool ss_is_recovering = statestore_subscriber_ != nullptr
+      && statestore_subscriber_->IsInPostRecoveryGracePeriod();
+
+  // If we are tracking a recovering membership but the statestore is out of recovery, we
+  // will need to send the current membership to the impala server.
+  bool update_local_server = recovering_membership_.get() != nullptr && !ss_is_recovering;
+
+  // If there's no statestore update and the local backend descriptor has no changes and
+  // we don't need to update the local server, then we can skip processing altogether and
+  // avoid making a copy of the state.
+  if (no_ss_update && !needs_local_be_update && !update_local_server) return;
+
+  if (!no_ss_update) VLOG(1) << "Processing statestore update";
+  if (needs_local_be_update) VLOG(1) << "Local backend membership needs update";
+  if (update_local_server) VLOG(1) << "Local impala server needs update";
+  if (ss_is_recovering) {
+    VLOG(1) << "Statestore subscriber is in post-recovery grace period";
+  }
+
+  // By now we know that we need to renew the snapshot. Construct a new state based on the
+  // type of the update we received.
+  std::shared_ptr<Snapshot> new_state;
+
+  if (!update.is_delta) {
+    VLOG(1) << "Received full membership update";
+    // Full topic transmit, create fresh state.
+    new_state = std::make_shared<Snapshot>();
+    // A full update could remove backends and therefore we need to send an update to the
+    // local server.
+    update_local_server = true;
+  } else {
+    VLOG(1) << "Received delta membership update";
+    if (recovering_membership_.get() != nullptr) {
+      // The recovering membership is never exposed to clients and therefore requires no
+      // copying.
+      new_state = recovering_membership_;
+    } else {
+      // Make a copy of the current membership. This is the only function calling SetState
+      // and thus no lock is needed for read access.
+      new_state = std::make_shared<Snapshot>(*current_membership_);
+    }
+  }
+  if (local_be_desc.get() != nullptr) new_state->local_be_desc = local_be_desc;
+
+  // Process removed, new, and updated entries from the topic update and apply the changes
+  // to the new backend map and executor groups.
+  BackendIdMap* new_backend_map = &(new_state->current_backends);
+  ExecutorGroups* new_executor_groups = &(new_state->executor_groups);
+  for (const TTopicItem& item : update.topic_entries) {
+    // Deleted item
+    if (item.deleted) {
+      if (new_backend_map->find(item.key) != new_backend_map->end()) {
+        const TBackendDescriptor& be_desc = (*new_backend_map)[item.key];
+        if (be_desc.is_executor && !be_desc.is_quiescing) {
+          const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
+          for (const string& group : groups) {
+            VLOG(1) << "Removing backend " << item.key << " from group " << group
+                    << " (deleted)";
+            (*new_executor_groups)[group].RemoveExecutor(be_desc);
+          }
+        }
+        new_backend_map->erase(item.key);
+        update_local_server = true;
+      }
+      continue;
+    }
+
+    // New or existing item
+    TBackendDescriptor be_desc;
+    // Benchmarks have suggested that this method can deserialize ~10m messages per
+    // second, so no immediate need to consider optimization.
+    uint32_t len = item.value.size();
+    Status status = DeserializeThriftMsg(
+        reinterpret_cast<const uint8_t*>(item.value.data()), &len, false, &be_desc);
+    if (!status.ok()) {
+      LOG_EVERY_N(WARNING, 30) << "Error deserializing membership topic item with key: "
+          << item.key;
+      continue;
+    }
+    if (be_desc.ip_address.empty()) {
+      // Each backend resolves its own IP address and transmits it inside its backend
+      // descriptor as part of the statestore update. If it is empty, then either that
+      // code has been changed, or someone else is sending malformed packets.
+      LOG_EVERY_N(WARNING, 30) << "Ignoring subscription request with empty IP address "
+          "from subscriber: " << TNetworkAddressToString(be_desc.address);
+      continue;
+    }
+    if (item.key == local_backend_id_) {
+      if (local_be_desc.get() == nullptr) {
+        LOG_EVERY_N(WARNING, 30) << "Another host registered itself with the local "
+            << "backend id (" << item.key << "), but the local backend has not started "
+            "yet. The offending address is: " << TNetworkAddressToString(be_desc.address);
+      } else if (be_desc.address != local_be_desc->address) {
+        // Someone else has registered this subscriber ID with a different address. We
+        // will try to re-register (i.e. overwrite their subscription), but there is
+        // likely a configuration problem.
+        LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
+            << TNetworkAddressToString(be_desc.address) << " (we are: "
+            << TNetworkAddressToString(local_be_desc->address) << ", backend id: "
+            << item.key << ")";
+      }
+      // We will always set the local backend explicitly below, so we ignore it here.
+      continue;
+    }
+
+    auto it = new_backend_map->find(item.key);
+    if (it != new_backend_map->end()) {
+      // Update
+      TBackendDescriptor& existing = it->second;
+      if (be_desc.is_quiescing && !existing.is_quiescing && existing.is_executor) {
+        // Executor needs to be removed from its groups
+        const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
+        for (const string& group : groups) {
+          VLOG(1) << "Removing backend " << item.key << " from group " << group
+                  << " (quiescing)";
+          (*new_executor_groups)[group].RemoveExecutor(be_desc);
+        }
+      }
+      existing = be_desc;
+    } else {
+      // Create
+      new_backend_map->insert(make_pair(item.key, be_desc));
+      if (!be_desc.is_quiescing && be_desc.is_executor) {
+        const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
+        for (const string& group : groups) {
+          VLOG(1) << "Adding backend " << item.key << " to group " << group;
+          (*new_executor_groups)[group].AddExecutor(be_desc);
+        }
+      }
+    }
+    DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups));
+  }
+
+  // Update the local backend descriptor if required. We need to re-check new_state here
+  // in case it was reset to empty above.
+  if (NeedsLocalBackendUpdate(*new_state, local_be_desc)) {
+    // We need to update both the new membership state and the statestore
+    new_state->current_backends[local_backend_id_] = *local_be_desc;
+    const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
+    for (const string& group : groups) {
+      if (local_be_desc->is_quiescing) {
+        VLOG(1) << "Removing local backend from group " << group;
+        (*new_executor_groups)[group].RemoveExecutor(*local_be_desc);
+      } else if (local_be_desc->is_executor) {
+        VLOG(1) << "Adding local backend to group " << group;
+        (*new_executor_groups)[group].AddExecutor(*local_be_desc);
+      }
+    }
+    AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates);
+    DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups));
+  }
+
+  // Don't send updates or update the current membership if the statestore is in its
+  // post-recovery grace period.
+  if (ss_is_recovering) {
+    recovering_membership_ = new_state;
+    return;
+  }
+
+  // Send notifications to local ImpalaServer and Frontend through registered callbacks.
+  if (update_local_server) NotifyLocalServerForDeletedBackend(*new_backend_map);
+  UpdateFrontendExecutorMembership(*new_backend_map, *new_executor_groups);
+
+  // Atomically update the respective membership snapshot.
+  SetState(new_state);
+  recovering_membership_.reset();
+}
+
+void ClusterMembershipMgr::AddLocalBackendToStatestore(
+    const TBackendDescriptor& local_be_desc,
+    vector<TTopicDelta>* subscriber_topic_updates) {
+  VLOG(1) << "Sending local backend to statestore";
+
+  subscriber_topic_updates->emplace_back(TTopicDelta());
+  TTopicDelta& update = subscriber_topic_updates->back();
+  update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
+  update.topic_entries.emplace_back(TTopicItem());
+  // Setting this flag allows us to pass the resulting topic update to other
+  // ClusterMembershipMgr instances in tests unmodified.
+  update.is_delta = true;
+
+  TTopicItem& item = update.topic_entries.back();
+  item.key = local_backend_id_;
+  Status status = thrift_serializer_.SerializeToString(&local_be_desc, &item.value);
+  if (!status.ok()) {
+    LOG(FATAL) << "Failed to serialize Impala backend descriptor for statestore topic:"
+                << " " << status.GetDetail();
+    subscriber_topic_updates->pop_back();
+    return;
+  }
+}
+
+ClusterMembershipMgr::BeDescSharedPtr ClusterMembershipMgr::GetLocalBackendDescriptor() {
+  lock_guard<mutex> l(callback_fn_lock_);
+  return local_be_desc_fn_ ? local_be_desc_fn_() : nullptr;
+}
+
+void ClusterMembershipMgr::NotifyLocalServerForDeletedBackend(
+    const BackendIdMap& current_backends) {
+  VLOG(3) << "Notifying local server of membership changes";
+  lock_guard<mutex> l(callback_fn_lock_);
+  if (!update_local_server_fn_) return;
+  BackendAddressSet current_backend_set;
+  for (const auto& it : current_backends) current_backend_set.insert(it.second.address);
+  update_local_server_fn_(current_backend_set);
+}
+
+void ClusterMembershipMgr::UpdateFrontendExecutorMembership(
+    const BackendIdMap& current_backends, const ExecutorGroups executor_groups) {
+  lock_guard<mutex> l(callback_fn_lock_);
+  if (!update_frontend_fn_) return;
+  TUpdateExecutorMembershipRequest update_req;
+  for (const auto& it : current_backends) {
+    const TBackendDescriptor& backend = it.second;
+    if (backend.is_executor) {
+      update_req.hostnames.insert(backend.address.hostname);
+      update_req.ip_addresses.insert(backend.ip_address);
+      update_req.num_executors++;
+    }
+  }
+  Status status = update_frontend_fn_(update_req);
+  if (!status.ok()) {
+    LOG(WARNING) << "Error updating frontend membership snapshot: " << status.GetDetail();
+  }
+}
+
+void ClusterMembershipMgr::SetState(const SnapshotPtr& new_state) {
+  lock_guard<mutex> l(current_membership_lock_);
+  DCHECK(new_state.get() != nullptr);
+  current_membership_ = new_state;
+}
+
+bool ClusterMembershipMgr::NeedsLocalBackendUpdate(const Snapshot& state,
+    const BeDescSharedPtr& local_be_desc) {
+  if (local_be_desc.get() == nullptr) return false;
+  if (state.local_be_desc.get() == nullptr) return true;
+  auto it = state.current_backends.find(local_backend_id_);
+  if (it == state.current_backends.end()) return true;
+  return it->second.is_quiescing != local_be_desc->is_quiescing;
+}
+
+bool ClusterMembershipMgr::CheckConsistency(const BackendIdMap& current_backends,
+      const ExecutorGroups& executor_groups) {
+  // Build a map of all backend descriptors
+  std::unordered_map<TNetworkAddress, TBackendDescriptor> address_to_backend;
+  for (const auto& it : current_backends) {
+    address_to_backend.emplace(it.second.address, it.second);
+  }
+
+  // Check groups against the map
+  for (const auto& group_it : executor_groups) {
+    const string& group_name = group_it.first;
+    const ExecutorGroup& group = group_it.second;
+    ExecutorGroup::Executors backends = group.GetAllExecutorDescriptors();
+    for (const TBackendDescriptor& group_be : backends) {
+      if (!group_be.is_executor) {
+        LOG(WARNING) << "Backend " << group_be.address << " in group " << group_name
+            << " is not an executor";
+        return false;
+      }
+      if (group_be.is_quiescing) {
+        LOG(WARNING) << "Backend " << group_be.address << " in group " << group_name
+            << " is quiescing";
+        return false;
+      }
+      auto current_be_it = address_to_backend.find(group_be.address);
+      if (current_be_it == address_to_backend.end()) {
+        LOG(WARNING) << "Backend " << group_be.address << " is in group " << group_name
+            << " but not in current set of backends";
+        return false;
+      }
+      if (current_be_it->second.is_quiescing != group_be.is_quiescing) {
+        LOG(WARNING) << "Backend " << group_be.address << " in group " << group_name
+            << " differs from backend in current set of backends: is_quiescing ("
+            << current_be_it->second.is_quiescing << " != " << group_be.is_quiescing
+            << ")";
+        return false;
+      }
+      if (current_be_it->second.is_executor != group_be.is_executor) {
+        LOG(WARNING) << "Backend " << group_be.address << " in group " << group_name
+            << " differs from backend in current set of backends: is_executor ("
+            << current_be_it->second.is_executor << " != " << group_be.is_executor << ")";
+        return false;
+      }
+    }
+  }
+  return true;
+}
+
+} // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
new file mode 100644
index 0000000..fc8b58e
--- /dev/null
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/global-types.h"
+#include "common/status.h"
+#include "gen-cpp/StatestoreService_types.h"
+#include "gutil/threading/thread_collision_warner.h"
+#include "scheduling/executor-group.h"
+#include "statestore/statestore-subscriber.h"
+#include "util/container-util.h"
+
+namespace impala {
+
+namespace test {
+class SchedulerWrapper;
+}
+
+/// The ClusterMembershipMgr manages the local backend's membership in the cluster. It has
+/// two roles:
+///   - Provide a consistent view on the current cluster membership, i.e. other backends
+///   - Establish and maintain the membership of the local backend in the cluster.
+///
+/// To do this, the class subscribes to the statestore cluster-membership topic and
+/// applies incoming changes to its local copy of the cluster state. If it finds that the
+/// local backend is missing from the cluster membership, it will add it (contingent on
+/// the local backend being available after startup).
+///
+/// Clients of this class can obtain a consistent, immutable snapshot of the cluster
+/// membership state through GetSnapshot().
+///
+/// The ClusterMembershipMgr keeps the membership snapshot stable while the statestore
+/// subscriber is recovering from a connection failure. This allows other backends to
+/// re-register with the statestore after a statestore restart.
+///
+/// TODO(IMPALA-8484): Allow specifying executor groups during backend startup. Currently
+/// only one executor group named "default" exists. All backends are part of that group
+/// and it's the only group available for scheduling.
+///
+/// The class also allows the local backend (ImpalaServer) and the local Frontend to
+/// register callbacks to receive notifications of changes to the cluster membership.
+///
+/// TODO: Replace the usage of shared_ptr with atomic_shared_ptr once compilers support
+///       it. Alternatively consider using Kudu's rw locks.
+class ClusterMembershipMgr {
+ public:
+  /// A immutable pointer to a backend descriptor. It is used to return a consistent,
+  /// immutable copy of a backend descriptor.
+  typedef std::shared_ptr<const TBackendDescriptor> BeDescSharedPtr;
+
+  /// Maps statestore subscriber IDs to backend descriptors.
+  typedef std::unordered_map<std::string, TBackendDescriptor> BackendIdMap;
+
+  /// Maps executor group names to executor groups. For now, only a default group exists
+  /// and all executors are part of that group.
+  typedef std::unordered_map<std::string, ExecutorGroup> ExecutorGroups;
+
+  // A snapshot of the current cluster membership. The ClusterMembershipMgr maintains a
+  // consistent copy of this and updates it atomically when the membership changes.
+  // Clients can obtain an immutable copy. Class instances can be created through the
+  // implicitly-defined default and copy constructors.
+  struct Snapshot {
+    Snapshot() = default;
+    Snapshot(const Snapshot&) = default;
+    /// The current backend descriptor of the local backend.
+    BeDescSharedPtr local_be_desc;
+    /// Map from unique backend ID to TBackendDescriptor. The
+    /// {backend ID, TBackendDescriptor} pairs represent the IMPALA_MEMBERSHIP_TOPIC
+    /// {key, value} pairs of known executors retrieved from the statestore. It's
+    /// important to track both the backend ID as well as the TBackendDescriptor so we
+    /// know what is being removed in a given update.
+    BackendIdMap current_backends;
+    /// A map of executor groups by their names.
+    ExecutorGroups executor_groups;
+  };
+
+  /// An immutable shared membership snapshot.
+  typedef std::shared_ptr<const Snapshot> SnapshotPtr;
+
+  /// A callback to provide the local backend descriptor. Typically the ImpalaServer would
+  /// provide such a backend descriptor, and it's the task of code outside this class to
+  /// register the ImpalaServer with this class, e.g. after it has started up. Tests can
+  /// register their own callback to provide a local backend descriptor. No locks are held
+  /// when calling this callback.
+  typedef std::function<BeDescSharedPtr()> BackendDescriptorPtrFn;
+
+  /// A set of backend addresses.
+  typedef std::unordered_set<TNetworkAddress> BackendAddressSet;
+
+  /// A callback to notify the local ImpalaServer of changes to the cluster membership.
+  /// Only called when backends are deleted from the current membership. No locks are held
+  /// when calling this callback.
+  typedef std::function<void(const BackendAddressSet&)> UpdateLocalServerFn;
+
+  /// A callback to notify the local Frontend of changes to the cluster membership. No
+  /// locks are held when calling this callback.
+  typedef std::function<Status(const TUpdateExecutorMembershipRequest&)> UpdateFrontendFn;
+
+  ClusterMembershipMgr(std::string local_backend_id, StatestoreSubscriber* subscriber);
+
+  /// Initializes instances of this class. This only sets up the statestore subscription.
+  /// Callbacks to the local ImpalaServer and Frontend must be registered in separate
+  /// steps.
+  Status Init();
+
+  /// The following functions allow users of this class to register callbacks for certain
+  /// events. They may be called at any time before or after calling Init() and are
+  /// thread-safe. Each of the functions may be called at most once during the lifetime of
+  /// the object, and the function 'fn' must not be empty.
+
+  /// Registers a callback to provide the local backend descriptor.
+  void SetLocalBeDescFn(BackendDescriptorPtrFn fn);
+
+  /// Registers a callback to notify the local ImpalaServer of changes in the cluster
+  /// membership. This callback will only be called when backends are deleted from the
+  /// membership.
+  void SetUpdateLocalServerFn(UpdateLocalServerFn fn);
+
+  /// Registers a callback to notify the local Frontend of changes in the cluster
+  /// membership.
+  void SetUpdateFrontendFn(UpdateFrontendFn fn);
+
+  /// Returns a read only snapshot of the current cluster membership state. May be called
+  /// before or after calling Init(). The returned shared pointer will always be non-null,
+  /// but it may point to an empty Snapshot, depending on the arrival of statestore
+  /// updates and the status of the local backend.
+  SnapshotPtr GetSnapshot() const;
+
+  /// The default executor group name.
+  static const std::string DEFAULT_EXECUTOR_GROUP;
+
+  /// Handler for statestore updates, called asynchronously when an update is received
+  /// from the subscription manager. This method processes incoming updates from the
+  /// statestore and applies them to the current membership state. It also ensures that
+  /// the local backend descriptor gets registered with the statestore and adds it to the
+  /// current membership state. This method changes the 'current_membership_' atomically.
+  ///
+  /// This handler is registered with the statestore and must not be called directly,
+  /// except in tests.
+  void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
+      std::vector<TTopicDelta>* subscriber_topic_updates);
+
+ private:
+  /// Serializes and adds the local backend descriptor to 'subscriber_topic_updates'.
+  void AddLocalBackendToStatestore(const TBackendDescriptor& local_be_desc,
+      std::vector<TTopicDelta>* subscriber_topic_updates);
+
+  /// Returns the local backend descriptor or nullptr if no local backend has been
+  /// registered.
+  BeDescSharedPtr GetLocalBackendDescriptor();
+
+  /// Notifies the ImpalaServer of a deleted backend through the registered callback for
+  /// cluster membership changes if the callback is non-empty.
+  void NotifyLocalServerForDeletedBackend(const BackendIdMap& current_backends);
+
+  /// Notifies the Frontend through the registered callback for cluster membership changes
+  /// if the callback is non-empty.
+  void UpdateFrontendExecutorMembership(const BackendIdMap& current_backends,
+      const ExecutorGroups executor_groups);
+
+  /// Atomically replaces a membership snapshot with a new copy.
+  void SetState(const SnapshotPtr& new_state);
+
+  /// Checks if the local backend is available and its registration in 'state' matches the
+  /// given backend descriptor in 'local_be_desc'. Returns false otherwise.
+  bool NeedsLocalBackendUpdate(const Snapshot& state,
+      const BeDescSharedPtr& local_be_desc);
+
+  /// Checks that the backend ID map is consistent with 'executor_groups', i.e. all
+  /// executors in all groups are also present in the map and are non-quiescing executors.
+  /// This method should only be called in debug builds.
+  bool CheckConsistency(const BackendIdMap& current_backends,
+      const ExecutorGroups& executor_groups);
+
+  /// Fake mutex to DCHECK that only one thread at a time calls UpdateMembership.
+  DFAKE_MUTEX(update_membership_lock_);
+
+  /// The snapshot of the current cluster membership. When receiving changes to the
+  /// executors configuration from the statestore we will make a copy of the stored
+  /// object, apply the updates to the copy and atomically swap the contents of this
+  /// pointer.
+  SnapshotPtr current_membership_;
+
+  /// Protects current_membership_
+  mutable boost::mutex current_membership_lock_;
+
+  /// A temporary membership snapshot to hold updates while the statestore is in its
+  /// post-recovery grace period. Not exposed to clients and not protected by any locking.
+  std::shared_ptr<Snapshot> recovering_membership_;
+
+  /// Pointer to a subscription manager (which we do not own) which is used to register
+  /// for dynamic updates to the set of available backends. May be nullptr if the set of
+  /// backends is fixed (only useful for tests).
+  StatestoreSubscriber* statestore_subscriber_;
+
+  /// Serializes TBackendDescriptors when creating topic updates
+  ThriftSerializer thrift_serializer_;
+
+  /// Unique - across the cluster - identifier for this impala backend. Used to validate
+  /// incoming backend descriptors and to register this backend with the statestore.
+  std::string local_backend_id_;
+
+  /// Callbacks that provide external dependencies.
+  BackendDescriptorPtrFn local_be_desc_fn_;
+  UpdateLocalServerFn update_local_server_fn_;
+  UpdateFrontendFn update_frontend_fn_;
+
+  /// Protects the callbacks.
+  mutable boost::mutex callback_fn_lock_;
+
+  friend class impala::test::SchedulerWrapper;
+};
+
+} // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-test-util.cc b/be/src/scheduling/cluster-membership-test-util.cc
new file mode 100644
index 0000000..cd70761
--- /dev/null
+++ b/be/src/scheduling/cluster-membership-test-util.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/cluster-membership-test-util.h"
+#include "common/logging.h"
+#include "common/names.h"
+
+static const int BACKEND_PORT = 1000;
+static const int KRPC_PORT = 2000;
+static const string HOSTNAME_PREFIX = "host_";
+static const string IP_PREFIX = "10";
+
+namespace impala {
+namespace test {
+
+string HostIdxToHostname(int host_idx) {
+  return HOSTNAME_PREFIX + std::to_string(host_idx);
+}
+
+string HostIdxToIpAddr(int host_idx) {
+  DCHECK_LT(host_idx, (1 << 24));
+  string suffix;
+  for (int i = 0; i < 3; ++i) {
+    suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend
+    host_idx /= 256;
+  }
+  DCHECK_EQ(0, host_idx);
+  return IP_PREFIX + suffix;
+}
+
+TBackendDescriptor MakeBackendDescriptor(int idx, int port_offset) {
+  TBackendDescriptor be_desc;
+  be_desc.address.hostname = HostIdxToHostname(idx);
+  be_desc.address.port = BACKEND_PORT + port_offset;
+  be_desc.ip_address = HostIdxToIpAddr(idx);
+  // krpc_address is always resolved
+  be_desc.krpc_address.hostname = be_desc.ip_address;
+  be_desc.krpc_address.port = KRPC_PORT + port_offset;
+  be_desc.__set_is_coordinator(true);
+  be_desc.__set_is_executor(true);
+  be_desc.is_quiescing = false;
+  return be_desc;
+}
+
+}  // end namespace test
+}  // end namespace impala
diff --git a/be/src/util/uid-util-test.cc b/be/src/scheduling/cluster-membership-test-util.h
similarity index 52%
copy from be/src/util/uid-util-test.cc
copy to be/src/scheduling/cluster-membership-test-util.h
index 4015bd5..2ce858c 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/scheduling/cluster-membership-test-util.h
@@ -15,26 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <stdlib.h>
-#include <stdio.h>
-#include <iostream>
+#pragma once
 
-#include "testutil/gtest-util.h"
-#include "util/uid-util.h"
+#include "gen-cpp/StatestoreService_types.h"
+#include <string>
 
 namespace impala {
+namespace test {
 
-TEST(UidUtil, FragmentInstanceId) {
-  boost::uuids::random_generator uuid_generator;
-  boost::uuids::uuid query_uuid = uuid_generator();
-  TUniqueId query_id = UuidToQueryId(query_uuid);
+/// Convert a host index to a hostname.
+std::string HostIdxToHostname(int host_idx);
 
-  for (int i = 0; i < 100; ++i) {
-    TUniqueId instance_id = CreateInstanceId(query_id, i);
-    EXPECT_EQ(GetQueryId(instance_id), query_id);
-    EXPECT_EQ(GetInstanceIdx(instance_id), i);
-  }
-}
+/// Convert a host index to an IP address. The host index must be smaller than 2^24 and
+/// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
+std::string HostIdxToIpAddr(int host_idx);
 
-}
+/// Builds a new backend descriptor. 'idx' is used to determine its name and IP address
+/// and the caller must make sure that it is unique across sets of hosts. To create
+/// backends on the same host, an optional port offset can be specified.
+TBackendDescriptor MakeBackendDescriptor(int idx, int port_offset = 0);
 
+}  // end namespace test
+}  // end namespace impala
diff --git a/be/src/scheduling/executor-group-test.cc b/be/src/scheduling/executor-group-test.cc
new file mode 100644
index 0000000..ea14f6c
--- /dev/null
+++ b/be/src/scheduling/executor-group-test.cc
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/executor-group.h"
+
+#include "common/logging.h"
+#include "common/names.h"
+#include "scheduling/cluster-membership-test-util.h"
+#include "testutil/gtest-util.h"
+#include "util/network-util.h"
+#include "util/thread.h"
+
+using namespace impala;
+using namespace impala::test;
+
+/// Test adding multiple backends on different hosts.
+TEST(ExecutorGroupTest, AddExecutors) {
+  ExecutorGroup executor_group;
+  executor_group.AddExecutor(MakeBackendDescriptor(1));
+  executor_group.AddExecutor(MakeBackendDescriptor(2));
+  ASSERT_EQ(2, executor_group.NumExecutors());
+  IpAddr backend_ip;
+  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_2", &backend_ip));
+  EXPECT_EQ("10.0.0.2", backend_ip);
+}
+
+/// Test adding multiple backends on the same host.
+TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) {
+  ExecutorGroup executor_group;
+  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/0));
+  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/1));
+  IpAddr backend_ip;
+  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  const ExecutorGroup::Executors& backend_list =
+      executor_group.GetExecutorsForHost("10.0.0.1");
+  EXPECT_EQ(2, backend_list.size());
+}
+
+/// Test removing a backend.
+TEST(ExecutorGroupTest, RemoveExecutor) {
+  ExecutorGroup executor_group;
+  executor_group.AddExecutor(MakeBackendDescriptor(1));
+  executor_group.AddExecutor(MakeBackendDescriptor(2));
+  executor_group.RemoveExecutor(MakeBackendDescriptor(2));
+  IpAddr backend_ip;
+  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  ASSERT_FALSE(executor_group.LookUpExecutorIp("host_2", &backend_ip));
+}
+
+/// Test removing one of multiple backends on the same host (IMPALA-3944).
+TEST(ExecutorGroupTest, RemoveExecutorOnSameHost) {
+  ExecutorGroup executor_group;
+  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/0));
+  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/1));
+  executor_group.RemoveExecutor(MakeBackendDescriptor(1, /* port_offset=*/1));
+  IpAddr backend_ip;
+  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  EXPECT_EQ("10.0.0.1", backend_ip);
+  const ExecutorGroup::Executors& backend_list =
+      executor_group.GetExecutorsForHost("10.0.0.1");
+  EXPECT_EQ(1, backend_list.size());
+}
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc
new file mode 100644
index 0000000..f4e1354
--- /dev/null
+++ b/be/src/scheduling/executor-group.cc
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/executor-group.h"
+
+namespace impala {
+
+// Hand-testing shows that 25 replicas produces a reasonable balance between nodes
+// across the hash ring. See HashRingTest::MaxMinRatio() for some empirical results
+// at similar replication levels. There is nothing special about 25 (i.e. 24 or 26
+// would be similar). Increasing this results in a more even distribution.
+// TODO: This can be tuned further with real world tests
+static const uint32_t NUM_HASH_RING_REPLICAS = 25;
+
+ExecutorGroup::ExecutorGroup()
+  : executor_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {}
+
+const ExecutorGroup::Executors& ExecutorGroup::GetExecutorsForHost(
+    const IpAddr& ip) const {
+  ExecutorMap::const_iterator it = executor_map_.find(ip);
+  DCHECK(it != executor_map_.end());
+  return it->second;
+}
+
+ExecutorGroup::IpAddrs ExecutorGroup::GetAllExecutorIps() const {
+  IpAddrs ips;
+  ips.reserve(NumExecutors());
+  for (auto& it: executor_map_) ips.push_back(it.first);
+  return ips;
+}
+
+ExecutorGroup::Executors ExecutorGroup::GetAllExecutorDescriptors() const {
+  Executors executors;
+  for (const auto& executor_list: executor_map_) {
+    executors.insert(executors.end(), executor_list.second.begin(),
+        executor_list.second.end());
+  }
+  return executors;
+}
+
+void ExecutorGroup::AddExecutor(const TBackendDescriptor& be_desc) {
+  // be_desc.is_executor can be false for the local backend when scheduling queries to run
+  // on the coordinator host.
+  DCHECK(!be_desc.ip_address.empty());
+  Executors& be_descs = executor_map_[be_desc.ip_address];
+  auto eq = [&be_desc](const TBackendDescriptor& existing) {
+    // The IP addresses must already match, so it is sufficient to check the port.
+    DCHECK_EQ(existing.ip_address, be_desc.ip_address);
+    return existing.address.port == be_desc.address.port;
+  };
+  if (find_if(be_descs.begin(), be_descs.end(), eq) != be_descs.end()) {
+    LOG(DFATAL) << "Tried to add existing backend to executor group: "
+        << be_desc.krpc_address;
+    return;
+  }
+  if (be_descs.empty()) {
+    executor_ip_hash_ring_.AddNode(be_desc.ip_address);
+  }
+  be_descs.push_back(be_desc);
+  executor_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
+}
+
+void ExecutorGroup::RemoveExecutor(const TBackendDescriptor& be_desc) {
+  auto be_descs_it = executor_map_.find(be_desc.ip_address);
+  if (be_descs_it == executor_map_.end()) {
+    LOG(DFATAL) << "Tried to remove a backend from non-existing host: "
+        << be_desc.krpc_address;
+    return;
+  }
+  auto eq = [&be_desc](const TBackendDescriptor& existing) {
+    // The IP addresses must already match, so it is sufficient to check the port.
+    DCHECK_EQ(existing.ip_address, be_desc.ip_address);
+    return existing.address.port == be_desc.address.port;
+  };
+
+  Executors& be_descs = be_descs_it->second;
+  auto remove_it = find_if(be_descs.begin(), be_descs.end(), eq);
+  if (remove_it == be_descs.end()) {
+    LOG(DFATAL) << "Tried to remove non-existing backend from per-host list: "
+        << be_desc.krpc_address;
+    return;
+  }
+  be_descs.erase(remove_it);
+  if (be_descs.empty()) {
+    executor_map_.erase(be_descs_it);
+    executor_ip_map_.erase(be_desc.address.hostname);
+    executor_ip_hash_ring_.RemoveNode(be_desc.ip_address);
+  }
+}
+
+bool ExecutorGroup::LookUpExecutorIp(const Hostname& hostname, IpAddr* ip) const {
+  // Check if hostname is already a valid IP address.
+  if (executor_map_.find(hostname) != executor_map_.end()) {
+    if (ip != nullptr) *ip = hostname;
+    return true;
+  }
+  auto it = executor_ip_map_.find(hostname);
+  if (it != executor_ip_map_.end()) {
+    if (ip != nullptr) *ip = it->second;
+    return true;
+  }
+  return false;
+}
+
+const TBackendDescriptor* ExecutorGroup::LookUpBackendDesc(
+    const TNetworkAddress& host) const {
+  IpAddr ip;
+  if (LookUpExecutorIp(host.hostname, &ip)) {
+    const ExecutorGroup::Executors& be_list = GetExecutorsForHost(ip);
+    for (const TBackendDescriptor& desc : be_list) {
+      if (desc.address == host) return &desc;
+    }
+  }
+  return nullptr;
+}
+
+}  // end ns impala
diff --git a/be/src/scheduling/executor-group.h b/be/src/scheduling/executor-group.h
new file mode 100644
index 0000000..01c4839
--- /dev/null
+++ b/be/src/scheduling/executor-group.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <unordered_map>
+#include <vector>
+
+#include "gen-cpp/StatestoreService_types.h"
+#include "gen-cpp/Types_types.h"
+#include "scheduling/hash-ring.h"
+#include "util/container-util.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+/// Configuration class to store a list of executor hosts, a list of backend descriptors
+/// per host, a mapping from hostnames to IP addresses, and a hash ring containing all
+/// backends.
+///
+/// The intended purpose of this class is to provide a consistent view on a subset of
+/// executors during scheduling. The class is not thread safe. In particular, some of the
+/// getter methods return references and the membership must not be changed while client
+/// code holds those references.
+///
+/// Note that only during tests objects of this class will store more than one backend per
+/// host/IP address.
+class ExecutorGroup {
+ public:
+  ExecutorGroup();
+  ExecutorGroup(const ExecutorGroup& other) = default;
+
+  /// List of backends, in this case they're all executors.
+  typedef std::vector<TBackendDescriptor> Executors;
+  typedef std::vector<IpAddr> IpAddrs;
+
+  /// Returns the list of executors on a particular host. The caller must make sure that
+  /// the host is actually contained in executor_map_.
+  const Executors& GetExecutorsForHost(const IpAddr& ip) const;
+
+  /// Returns all executor IP addresses in the executor group.
+  IpAddrs GetAllExecutorIps() const;
+
+  /// Returns all executor backend descriptors. Note that during tests this can include
+  /// multiple executors per IP address.
+  Executors GetAllExecutorDescriptors() const;
+
+  /// Adds an executor to the group. If it already exists, it is ignored. Backend
+  /// descriptors are identified by their IP address and port.
+  void AddExecutor(const TBackendDescriptor& be_desc);
+
+  /// Removes an executor from the group if it exists. Otherwise does nothing. Backend
+  /// descriptors are identified by their IP address and port.
+  void RemoveExecutor(const TBackendDescriptor& be_desc);
+
+  /// Look up the IP address of 'hostname' in the internal executor maps and return
+  /// whether the lookup was successful. If 'hostname' itself is a valid IP address and is
+  /// contained in executor_map_, then it is copied to 'ip' and true is returned. 'ip' can
+  /// be nullptr if the caller only wants to check whether the lookup succeeds. Use this
+  /// method to resolve datanode hostnames to IP addresses during scheduling, to prevent
+  /// blocking on the OS.
+  bool LookUpExecutorIp(const Hostname& hostname, IpAddr* ip) const;
+
+  /// Looks up the backend descriptor for the executor with hostname 'host'. Returns
+  /// nullptr if it's not found. The returned descriptor should not be retained beyond the
+  /// lifetime of this ExecutorGroup and the caller must make sure that the group does not
+  /// change while it holds the pointer.
+  const TBackendDescriptor* LookUpBackendDesc(const TNetworkAddress& host) const;
+
+  /// Returns the hash ring associated with this executor group. It's owned by the group
+  /// and the caller must not hold a reference beyond the groups lifetime.
+  const HashRing* GetHashRing() const { return &executor_ip_hash_ring_; }
+
+  /// Returns the number of executor hosts in this group. During tests, hosts can run
+  /// multiple executor backend descriptors.
+  int NumExecutors() const { return executor_map_.size(); }
+
+ private:
+  /// Map from a host's IP address to a list of executors running on that node.
+  typedef std::unordered_map<IpAddr, Executors> ExecutorMap;
+  ExecutorMap executor_map_;
+
+  /// Map from a hostname to its IP address to support hostname based executor lookup. It
+  /// contains entries for all executors in executor_map_ and needs to be updated whenever
+  /// executor_map_ changes.
+  typedef std::unordered_map<Hostname, IpAddr> ExecutorIpAddressMap;
+  ExecutorIpAddressMap executor_ip_map_;
+
+  /// All executors are kept in a hash ring to allow a consistent mapping from filenames
+  /// to executors.
+  HashRing executor_ip_hash_ring_;
+};
+
+}  // end ns impala
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index f1c6770..1bb5f02 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -23,7 +23,6 @@
 #include <boost/uuid/uuid_generators.hpp>
 
 #include "runtime/bufferpool/reservation-util.h"
-#include "util/container-util.h"
 #include "util/mem-info.h"
 #include "util/network-util.h"
 #include "util/parse-util.h"
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index acab122..2449f87 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -78,8 +78,8 @@ struct BackendExecParams {
   int64_t admit_mem_limit = 0;
 };
 
-/// map from an impalad host address to the list of assigned fragment instance params.
-typedef std::map<TNetworkAddress, BackendExecParams> PerBackendExecParams;
+/// Map from an impalad host address to the list of assigned fragment instance params.
+typedef std::unordered_map<TNetworkAddress, BackendExecParams> PerBackendExecParams;
 
 /// Execution parameters for a single fragment instance; used to assemble the
 /// TPlanFragmentInstanceCtx
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index dd98720..314b32a 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -22,6 +22,8 @@
 #include "common/names.h"
 #include "flatbuffers/flatbuffers.h"
 #include "gen-cpp/CatalogObjects_generated.h"
+#include "scheduling/cluster-membership-mgr.h"
+#include "scheduling/cluster-membership-test-util.h"
 #include "scheduling/scheduler.h"
 
 using namespace impala;
@@ -69,6 +71,17 @@ const int64_t FileSplitGeneratorSpec::DEFAULT_FILE_SIZE = 1 << 22;
 /// Default size for file splits is 1 MB.
 const int64_t FileSplitGeneratorSpec::DEFAULT_BLOCK_SIZE = 1 << 20;
 
+ClusterMembershipMgr::BeDescSharedPtr BuildBackendDescriptor(const Host& host) {
+  auto be_desc = make_shared<TBackendDescriptor>();
+  be_desc->address.hostname = host.ip;
+  be_desc->address.port = host.be_port;
+  be_desc->ip_address = host.ip;
+  be_desc->__set_is_coordinator(host.is_coordinator);
+  be_desc->__set_is_executor(host.is_executor);
+  be_desc->is_quiescing = false;
+  return be_desc;
+}
+
 int Cluster::AddHost(bool has_backend, bool has_datanode, bool is_executor) {
   int host_idx = hosts_.size();
   int be_port = has_backend ? BACKEND_PORT : -1;
@@ -76,7 +89,8 @@ int Cluster::AddHost(bool has_backend, bool has_datanode, bool is_executor) {
   IpAddr ip = HostIdxToIpAddr(host_idx);
   DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
   ip_to_idx_[ip] = host_idx;
-  hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port, is_executor));
+  hosts_.push_back(Host(HostIdxToHostname(host_idx),
+      ip, be_port, dn_port, has_backend && is_executor));
   // Add host to lists of backend indexes per type.
   if (has_backend) backend_host_idxs_.push_back(host_idx);
   if (has_datanode) {
@@ -95,10 +109,6 @@ void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode,
   for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode, is_executor);
 }
 
-Hostname Cluster::HostIdxToHostname(int host_idx) {
-  return HOSTNAME_PREFIX + std::to_string(host_idx);
-}
-
 void Cluster::GetBackendAddress(int host_idx, TNetworkAddress* addr) const {
   DCHECK_LT(host_idx, hosts_.size());
   addr->hostname = hosts_[host_idx].ip;
@@ -113,17 +123,6 @@ const vector<int>& Cluster::datanode_only_host_idxs() const {
   return datanode_only_host_idxs_;
 }
 
-IpAddr Cluster::HostIdxToIpAddr(int host_idx) {
-  DCHECK_LT(host_idx, (1 << 24));
-  string suffix;
-  for (int i = 0; i < 3; ++i) {
-    suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend
-    host_idx /= 256;
-  }
-  DCHECK_EQ(0, host_idx);
-  return IP_PREFIX + suffix;
-}
-
 void Schema::AddSingleBlockTable(
     const TableName& table_name, const vector<int>& non_cached_replica_host_idxs) {
   AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
@@ -524,7 +523,19 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
     locations = &expanded_locations;
   }
   DCHECK(locations != nullptr);
-  return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetExecutorsConfig(), 0,
+
+  ClusterMembershipMgr::SnapshotPtr membership_snapshot =
+      cluster_membership_mgr_->GetSnapshot();
+  auto it = membership_snapshot->executor_groups.find(
+      ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP);
+  // If a group does not exist (e.g. no executors are registered), we pass an empty group
+  // to the scheduler to exercise its error handling logic.
+  bool no_executor_group = it == membership_snapshot->executor_groups.end();
+  ExecutorGroup empty_group;
+  DCHECK(membership_snapshot->local_be_desc.get() != nullptr);
+  Scheduler::ExecutorConfig executor_config =
+      {no_executor_group ? empty_group : it->second, *membership_snapshot->local_be_desc};
+  return scheduler_->ComputeScanRangeAssignment(executor_config, 0,
       nullptr, false, *locations, plan_.referenced_datanodes(), exec_at_coord,
       plan_.query_options(), nullptr, assignment);
 }
@@ -573,35 +584,27 @@ void SchedulerWrapper::InitializeScheduler() {
                                            << "hosts.";
   const Host& scheduler_host = plan_.cluster().hosts()[0];
   string scheduler_backend_id = scheduler_host.ip;
-  TNetworkAddress scheduler_backend_address =
-      MakeNetworkAddress(scheduler_host.ip, scheduler_host.be_port);
-  TNetworkAddress scheduler_krpc_address =
-      MakeNetworkAddress(scheduler_host.ip, FLAGS_krpc_port);
-  scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id,
-      &metrics_, nullptr, nullptr));
-  const Status status = scheduler_->Init(scheduler_backend_address,
-      scheduler_krpc_address, scheduler_host.ip, /* admit_mem_limit */ 0L);
-  DCHECK(status.ok()) << "Scheduler init failed in test";
-  // Initialize the scheduler backend maps.
+  cluster_membership_mgr_.reset(new ClusterMembershipMgr(scheduler_backend_id, nullptr));
+  cluster_membership_mgr_->SetLocalBeDescFn([scheduler_host]() {
+      return BuildBackendDescriptor(scheduler_host);
+  });
+  Status status = cluster_membership_mgr_->Init();
+  DCHECK(status.ok()) << "Cluster membership manager init failed in test";
+  scheduler_.reset(new Scheduler(cluster_membership_mgr_.get(), &metrics_, nullptr));
+  // Initialize the cluster membership manager
   SendFullMembershipMap();
 }
 
 void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const {
   DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a running "
                              << "backend";
-  // Build backend descriptor.
-  TBackendDescriptor be_desc;
-  be_desc.address.hostname = host.ip;
-  be_desc.address.port = host.be_port;
-  be_desc.ip_address = host.ip;
-  be_desc.__set_is_coordinator(host.is_coordinator);
-  be_desc.__set_is_executor(host.is_executor);
+  ClusterMembershipMgr::BeDescSharedPtr be_desc = BuildBackendDescriptor(host);
 
   // Build topic item.
   TTopicItem item;
   item.key = host.ip;
   ThriftSerializer serializer(false);
-  Status status = serializer.SerializeToString(&be_desc, &item.value);
+  Status status = serializer.SerializeToString(be_desc.get(), &item.value);
   DCHECK(status.ok());
 
   // Add to topic delta.
@@ -616,5 +619,5 @@ void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
 
   // Send to the scheduler.
   vector<TTopicDelta> dummy_result;
-  scheduler_->UpdateMembership(delta_map, &dummy_result);
+  cluster_membership_mgr_->UpdateMembership(delta_map, &dummy_result);
 }
diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h
index 13dfda5..30c0f76 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -26,11 +26,14 @@
 
 #include "common/status.h"
 #include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions
+#include "scheduling/cluster-membership-mgr.h"
+#include "scheduling/scheduler.h"
 #include "scheduling/query-schedule.h"
 #include "util/metrics.h"
 
 namespace impala {
 
+class ClusterMembershipMgr;
 class Scheduler;
 class TTopicDelta;
 
@@ -113,13 +116,6 @@ class Cluster {
   void AddHosts(int num_hosts, bool has_backend, bool has_datanode,
       bool is_executor = true);
 
-  /// Convert a host index to a hostname.
-  static Hostname HostIdxToHostname(int host_idx);
-
-  /// Convert a host index to an IP address. The host index must be smaller than 2^24 and
-  /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
-  static IpAddr HostIdxToIpAddr(int host_idx);
-
   /// Return the backend address (ip, port) for the host with index 'host_idx'.
   void GetBackendAddress(int host_idx, TNetworkAddress* addr) const;
 
@@ -492,6 +488,7 @@ class SchedulerWrapper {
 
  private:
   const Plan& plan_;
+  boost::scoped_ptr<ClusterMembershipMgr> cluster_membership_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;
   MetricGroup metrics_;
 
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 0b271ad..f7d2e4e 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "scheduling/scheduler.h"
 #include "common/logging.h"
+#include "scheduling/cluster-membership-mgr.h"
+#include "scheduling/scheduler.h"
 #include "scheduling/scheduler-test-util.h"
 #include "testutil/gtest-util.h"
 
@@ -383,49 +384,12 @@ TEST_F(SchedulerTest, TestCachedReadPreferred) {
   EXPECT_EQ(0, result.NumRemoteAssignedBytes());
 }
 
-/// IMPALA-3019: Test for round robin reset problem. We schedule the same plan twice but
-/// send an empty statestored message in between.
-/// TODO: This problem cannot occur anymore and the test is merely green for random
-/// behavior. Remove.
-TEST_F(SchedulerTest, EmptyStatestoreMessage) {
-  Cluster cluster;
-  cluster.AddHosts(2, true, true);
-  cluster.AddHosts(3, false, true);
-
-  Schema schema(cluster);
-  schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 3);
-
-  Plan plan(schema);
-  plan.AddTableScan("T1");
-  // Test only applies when num_remote_executor_candidates=0.
-  plan.SetNumRemoteExecutorCandidates(0);
-
-  Result result(plan);
-  SchedulerWrapper scheduler(plan);
-
-  ASSERT_OK(scheduler.Compute(&result));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
-  result.Reset();
-
-  scheduler.SendEmptyUpdate();
-  ASSERT_OK(scheduler.Compute(&result));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
-}
-
 /// Test sending updates to the scheduler.
 TEST_F(SchedulerTest, TestSendUpdates) {
   Cluster cluster;
-  // 3 hosts, only first two run backends. This allows us to remove one of the backends
+  // 3 hosts, only last two run backends. This allows us to remove one of the backends
   // from the scheduler and then verify that reads are assigned to the other backend.
-  for (int i = 0; i < 3; ++i) cluster.AddHost(i < 2, true);
+  for (int i = 0; i < 3; ++i) cluster.AddHost(i > 0, true);
 
   Schema schema(cluster);
   schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::REMOTE_ONLY, 1);
@@ -441,24 +405,39 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   ASSERT_OK(scheduler.Compute(&result));
   // Two backends are registered, so the scheduler will pick a random one.
   EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(2));
 
-  // Remove first host from scheduler.
-  scheduler.RemoveBackend(cluster.hosts()[1]);
+  // Remove one host from scheduler.
+  int test_host = 2;
+  scheduler.RemoveBackend(cluster.hosts()[test_host]);
   result.Reset();
 
   ASSERT_OK(scheduler.Compute(&result));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
-  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
 
-  // Re-add first host from scheduler.
-  scheduler.AddBackend(cluster.hosts()[1]);
+  // Re-add host to scheduler.
+  scheduler.AddBackend(cluster.hosts()[test_host]);
   result.Reset();
 
   ASSERT_OK(scheduler.Compute(&result));
   // Two backends are registered, so the scheduler will pick a random one.
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+
+  // Remove the other host from the scheduler.
+  test_host = 1;
+  scheduler.RemoveBackend(cluster.hosts()[test_host]);
+  result.Reset();
+
+  ASSERT_OK(scheduler.Compute(&result));
+  // Only one backend remains so the scheduler must pick it.
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(2));
 }
 
 TEST_F(SchedulerTest, TestGeneratedSingleSplit) {
@@ -550,12 +529,8 @@ TEST_F(SchedulerTest, TestBlockAndGenerateSplit) {
   EXPECT_EQ(0, result.NumCachedAssignedBytes());
 }
 
-/// IMPALA-4329: Test scheduling with no backends.
-/// With the fix for IMPALA-5058, the scheduler is no longer responsible for
-/// registering the local backend with itself. This functionality is moved to
-/// ImpalaServer::MembershipCallback() and the scheduler will receive the local
-/// backend info through the statestore update, so until that happens, scheduling
-/// should fail.
+/// Test scheduling fails with no backends (the local backend gets registered with the
+/// scheduler but is not marked as an executor).
 TEST_F(SchedulerTest, TestEmptyBackendConfig) {
   Cluster cluster;
   cluster.AddHost(false, true);
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 78a4e98..858234b 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -32,6 +32,7 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/exec-env.h"
+#include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/hash-ring.h"
 #include "statestore/statestore-subscriber.h"
 #include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
@@ -49,202 +50,35 @@ using namespace apache::thrift;
 using namespace org::apache::impala::fb;
 using namespace strings;
 
-DECLARE_bool(is_coordinator);
-DECLARE_bool(is_executor);
-DECLARE_bool(mem_limit_includes_jvm);
-
 namespace impala {
 
 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
 static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
-static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
-
-Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
-    MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
-  : executors_config_(std::make_shared<const BackendConfig>()),
-    metrics_(metrics->GetOrCreateChildGroup("scheduler")),
-    webserver_(webserver),
-    statestore_subscriber_(subscriber),
-    local_backend_id_(backend_id),
-    thrift_serializer_(false),
-    request_pool_service_(request_pool_service) {
-}
 
-Status Scheduler::Init(const TNetworkAddress& backend_address,
-    const TNetworkAddress& krpc_address, const IpAddr& ip,
-    int64_t admit_mem_limit) {
+Scheduler::Scheduler(ClusterMembershipMgr* cluster_membership_mgr,
+    MetricGroup* metrics, RequestPoolService* request_pool_service)
+  : metrics_(metrics->GetOrCreateChildGroup("scheduler")),
+    cluster_membership_mgr_(cluster_membership_mgr),
+    request_pool_service_(request_pool_service) {
   LOG(INFO) << "Starting scheduler";
-  local_backend_descriptor_ = BuildLocalBackendDescriptor(webserver_, backend_address,
-      krpc_address, ip, admit_mem_limit);
-  LOG(INFO) << "Scheduler using " << local_backend_descriptor_.ip_address
-            << " as IP address";
-  coord_only_backend_config_.AddBackend(local_backend_descriptor_);
-
-  if (statestore_subscriber_ != nullptr) {
-    StatestoreSubscriber::UpdateCallback cb =
-        bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
-    Status status = statestore_subscriber_->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
-        /* populate_min_subscriber_topic_version=*/ false,
-        /* filter_prefix= */"", cb);
-    if (!status.ok()) {
-      status.AddDetail("Scheduler failed to register membership topic");
-      return status;
-    }
-  }
-
   if (metrics_ != nullptr) {
-    // This is after registering with the statestored, so we already have to synchronize
-    // access to the executors_config_ shared_ptr.
-    int num_backends = GetExecutorsConfig()->NumBackends();
     total_assignments_ = metrics_->AddCounter(ASSIGNMENTS_KEY, 0);
     total_local_assignments_ = metrics_->AddCounter(LOCAL_ASSIGNMENTS_KEY, 0);
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
-    num_fragment_instances_metric_ = metrics_->AddGauge(NUM_BACKENDS_KEY, num_backends);
-  }
-  return Status::OK();
-}
-
-TBackendDescriptor Scheduler::BuildLocalBackendDescriptor(
-    Webserver* webserver, const TNetworkAddress& backend_address,
-    const TNetworkAddress& krpc_address, const IpAddr& ip, int64_t admit_mem_limit) {
-  TBackendDescriptor local_backend_descriptor;
-  local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
-  local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
-  local_backend_descriptor.__set_address(backend_address);
-  // Store our IP address so that each subscriber doesn't have to resolve
-  // it on every heartbeat. May as well do it up front to avoid frequent DNS
-  // requests.
-  local_backend_descriptor.__set_ip_address(ip);
-  local_backend_descriptor.__set_admit_mem_limit(admit_mem_limit);
-  DCHECK(IsResolvedAddress(krpc_address)) << "KRPC relies on resolved IP address.";
-  DCHECK_EQ(krpc_address.hostname, local_backend_descriptor.ip_address);
-  local_backend_descriptor.__set_krpc_address(krpc_address);
-  if (webserver != nullptr) {
-    const TNetworkAddress& webserver_address = webserver->http_address();
-    if (IsWildcardAddress(webserver_address.hostname)) {
-      local_backend_descriptor.__set_debug_http_address(
-          MakeNetworkAddress(ip, webserver_address.port));
-    } else {
-      local_backend_descriptor.__set_debug_http_address(webserver_address);
-    }
-    local_backend_descriptor.__set_secure_webserver(webserver->IsSecure());
-  }
-  return local_backend_descriptor;
-}
-
-void Scheduler::UpdateLocalBackendAddrForBeTest() {
-  local_backend_descriptor_.address = ExecEnv::GetInstance()->GetThriftBackendAddress();
-}
-
-void Scheduler::UpdateMembership(
-    const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
-    vector<TTopicDelta>* subscriber_topic_updates) {
-  // First look to see if the topic(s) we're interested in have an update
-  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
-
-  if (topic == incoming_topic_deltas.end()) return;
-  const TTopicDelta& delta = topic->second;
-
-  // If the delta transmitted by the statestore is empty we can skip processing
-  // altogether and avoid making a copy of executors_config_.
-  if (delta.is_delta && delta.topic_entries.empty()) return;
-
-  // This function needs to handle both delta and non-delta updates. To minimize the
-  // time needed to hold locks, all updates are applied to a copy of
-  // executors_config_,
-  // which is then swapped into place atomically.
-  std::shared_ptr<BackendConfig> new_executors_config;
-
-  if (!delta.is_delta) {
-    current_executors_.clear();
-    new_executors_config = std::make_shared<BackendConfig>();
-  } else {
-    // Make a copy
-    lock_guard<mutex> lock(executors_config_lock_);
-    new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
-  }
-
-  // Process new and removed entries to the topic. Update executors_config_ and
-  // current_executors_ to match the set of executors given by the
-  // subscriber_topic_updates.
-  for (const TTopicItem& item : delta.topic_entries) {
-    if (item.deleted) {
-      if (current_executors_.find(item.key) != current_executors_.end()) {
-        new_executors_config->RemoveBackend(current_executors_[item.key]);
-        current_executors_.erase(item.key);
-      }
-      continue;
-    }
-    TBackendDescriptor be_desc;
-    // Benchmarks have suggested that this method can deserialize
-    // ~10m messages per second, so no immediate need to consider optimization.
-    uint32_t len = item.value.size();
-    Status status = DeserializeThriftMsg(
-        reinterpret_cast<const uint8_t*>(item.value.data()), &len, false, &be_desc);
-    if (!status.ok()) {
-      VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
-      continue;
-    }
-    if (be_desc.ip_address.empty()) {
-      // Each scheduler resolves its hostname locally in Scheduler::Init() and
-      // adds the IP address to local_backend_descriptor_. If it is empty, then either
-      // that code has been changed, or someone else is sending malformed packets.
-      VLOG(1) << "Ignoring subscription request with empty IP address from subscriber: "
-              << TNetworkAddressToString(be_desc.address);
-      continue;
-    }
-    if (item.key == local_backend_id_
-        && be_desc.address != local_backend_descriptor_.address) {
-      // Someone else has registered this subscriber ID with a different address. We
-      // will try to re-register (i.e. overwrite their subscription), but there is
-      // likely a configuration problem.
-      LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
-           << TNetworkAddressToString(be_desc.address) << " (we are: "
-           << TNetworkAddressToString(local_backend_descriptor_.address) << ")";
-      continue;
-    }
-    if (be_desc.is_quiescing) {
-      // Make sure backends that are shutting down are not scheduled on.
-      auto it = current_executors_.find(item.key);
-      if (it != current_executors_.end()) {
-        new_executors_config->RemoveBackend(it->second);
-        current_executors_.erase(it);
-      }
-    } else if (be_desc.is_executor) {
-      new_executors_config->AddBackend(be_desc);
-      current_executors_.insert(make_pair(item.key, be_desc));
-    }
-  }
-  SetExecutorsConfig(new_executors_config);
-
-  if (metrics_ != nullptr) {
-    /// TODO-MT: fix this (do we even need to report it?)
-    num_fragment_instances_metric_->SetValue(current_executors_.size());
   }
 }
 
-Scheduler::ExecutorsConfigPtr Scheduler::GetExecutorsConfig() const {
-  lock_guard<mutex> l(executors_config_lock_);
-  DCHECK(executors_config_.get() != nullptr);
-  ExecutorsConfigPtr executor_config = executors_config_;
-  return executor_config;
-}
-
-void Scheduler::SetExecutorsConfig(const ExecutorsConfigPtr& executors_config) {
-  lock_guard<mutex> l(executors_config_lock_);
-  executors_config_ = executors_config;
-}
-
 const TBackendDescriptor& Scheduler::LookUpBackendDesc(
-    const BackendConfig& executor_config, const TNetworkAddress& host) {
-  const TBackendDescriptor* desc = executor_config.LookUpBackendDesc(host);
+    const ExecutorConfig& executor_config, const TNetworkAddress& host) {
+  const TBackendDescriptor* desc = executor_config.group.LookUpBackendDesc(host);
   if (desc == nullptr) {
-    // Local host may not be in executor_config if it's a dedicated coordinator
-    DCHECK(host == local_backend_descriptor_.address);
-    desc = &local_backend_descriptor_;
+    // Local host may not be in executor_config's executor group if it's a dedicated
+    // coordinator.
+    const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
+    DCHECK(host == local_be_desc.address);
+    DCHECK(!local_be_desc.is_executor);
+    desc = &local_be_desc;
   }
   return *desc;
 }
@@ -290,7 +124,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec
 }
 
 Status Scheduler::ComputeScanRangeAssignment(
-    const BackendConfig& executor_config, QuerySchedule* schedule) {
+    const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   RuntimeProfile::Counter* total_assignment_timer =
       ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
   const TQueryExecRequest& exec_request = schedule->request();
@@ -340,7 +174,7 @@ Status Scheduler::ComputeScanRangeAssignment(
 }
 
 void Scheduler::ComputeFragmentExecParams(
-    const BackendConfig& executor_config, QuerySchedule* schedule) {
+    const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   const TQueryExecRequest& exec_request = schedule->request();
 
   // for each plan, compute the FInstanceExecParams for the tree of fragments
@@ -392,7 +226,7 @@ void Scheduler::ComputeFragmentExecParams(
   }
 }
 
-void Scheduler::ComputeFragmentExecParams(const BackendConfig& executor_config,
+void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,
     const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
   // traverse input fragments
@@ -404,9 +238,10 @@ void Scheduler::ComputeFragmentExecParams(const BackendConfig& executor_config,
   const TPlanFragment& fragment = fragment_params->fragment;
   // case 1: single instance executed at coordinator
   if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
-    const TNetworkAddress& coord = local_backend_descriptor_.address;
-    DCHECK(local_backend_descriptor_.__isset.krpc_address);
-    const TNetworkAddress& krpc_coord = local_backend_descriptor_.krpc_address;
+    const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
+    const TNetworkAddress& coord = local_be_desc.address;
+    DCHECK(local_be_desc.__isset.krpc_address);
+    const TNetworkAddress& krpc_coord = local_be_desc.krpc_address;
     DCHECK(IsResolvedAddress(krpc_coord));
     // make sure that the coordinator instance ends up with instance idx 0
     TUniqueId instance_id = fragment_params->is_coord_fragment
@@ -444,7 +279,7 @@ void Scheduler::ComputeFragmentExecParams(const BackendConfig& executor_config,
   }
 }
 
-void Scheduler::CreateUnionInstances(const BackendConfig& executor_config,
+void Scheduler::CreateUnionInstances(const ExecutorConfig& executor_config,
     FragmentExecParams* fragment_params, QuerySchedule* schedule) {
   const TPlanFragment& fragment = fragment_params->fragment;
   DCHECK(ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
@@ -456,7 +291,9 @@ void Scheduler::CreateUnionInstances(const BackendConfig& executor_config,
   vector<TPlanNodeId> scan_node_ids;
   FindNodes(fragment.plan, scan_node_types, &scan_node_ids);
   vector<TNetworkAddress> scan_hosts;
-  for (TPlanNodeId id : scan_node_ids) GetScanHosts(id, *fragment_params, &scan_hosts);
+  for (TPlanNodeId id : scan_node_ids) {
+    GetScanHosts(executor_config.local_be_desc, id, *fragment_params, &scan_hosts);
+  }
 
   unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end());
 
@@ -474,8 +311,8 @@ void Scheduler::CreateUnionInstances(const BackendConfig& executor_config,
   // TODO-MT: figure out how to parallelize Union
   int per_fragment_idx = 0;
   for (const TNetworkAddress& host : hosts) {
-    const TBackendDescriptor& backend_descriptor =
-        LookUpBackendDesc(executor_config, host);
+    const TBackendDescriptor& backend_descriptor = LookUpBackendDesc(
+        executor_config, host);
     DCHECK(backend_descriptor.__isset.krpc_address);
     const TNetworkAddress& krpc_host = backend_descriptor.krpc_address;
     DCHECK(IsResolvedAddress(krpc_host));
@@ -489,7 +326,7 @@ void Scheduler::CreateUnionInstances(const BackendConfig& executor_config,
   }
 }
 
-void Scheduler::CreateScanInstances(const BackendConfig& executor_config,
+void Scheduler::CreateScanInstances(const ExecutorConfig& executor_config,
     PlanNodeId leftmost_scan_id, FragmentExecParams* fragment_params,
     QuerySchedule* schedule) {
   int max_num_instances =
@@ -497,11 +334,12 @@ void Scheduler::CreateScanInstances(const BackendConfig& executor_config,
   if (max_num_instances == 0) max_num_instances = 1;
 
   if (fragment_params->scan_range_assignment.empty()) {
-    DCHECK(local_backend_descriptor_.__isset.krpc_address);
-    DCHECK(IsResolvedAddress(local_backend_descriptor_.krpc_address));
+    const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
+    DCHECK(local_be_desc.__isset.krpc_address);
+    DCHECK(IsResolvedAddress(local_be_desc.krpc_address));
     // this scan doesn't have any scan ranges: run a single instance on the coordinator
     fragment_params->instance_exec_params.emplace_back(schedule->GetNextInstanceId(),
-        local_backend_descriptor_.address, local_backend_descriptor_.krpc_address, 0,
+        local_be_desc.address, local_be_desc.krpc_address, 0,
         *fragment_params);
     return;
   }
@@ -511,8 +349,8 @@ void Scheduler::CreateScanInstances(const BackendConfig& executor_config,
     // evenly divide up the scan ranges of the leftmost scan between at most
     // <dop> instances
     const TNetworkAddress& host = assignment_entry.first;
-    const TBackendDescriptor& backend_descriptor =
-        LookUpBackendDesc(executor_config, host);
+    const TBackendDescriptor& backend_descriptor = LookUpBackendDesc(
+        executor_config, host);
     DCHECK(backend_descriptor.__isset.krpc_address);
     TNetworkAddress krpc_host = backend_descriptor.krpc_address;
     DCHECK(IsResolvedAddress(krpc_host));
@@ -591,13 +429,14 @@ void Scheduler::CreateCollocatedInstances(
   }
 }
 
-Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_config,
+Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
     PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
     bool node_random_replica, const vector<TScanRangeLocationList>& locations,
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
     const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
     FragmentScanRangeAssignment* assignment) {
-  if (executor_config.NumBackends() == 0 && !exec_at_coord) {
+  const ExecutorGroup& executor_group = executor_config.group;
+  if (executor_group.NumExecutors() == 0 && !exec_at_coord) {
     return Status(TErrorCode::NO_REGISTERED_BACKENDS);
   }
 
@@ -614,8 +453,13 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
   // random rank.
   bool random_replica = query_options.schedule_random_replica || node_random_replica;
 
+  // TODO: Build this one from executor_group
+  ExecutorGroup coord_only_executor_group;
+  const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
+  coord_only_executor_group.AddExecutor(local_be_desc);
+  VLOG_QUERY << "Exec at coord is " << (exec_at_coord ? "true" : "false");
   AssignmentCtx assignment_ctx(
-      exec_at_coord ? coord_only_backend_config_ : executor_config, total_assignments_,
+      exec_at_coord ? coord_only_executor_group : executor_group, total_assignments_,
       total_local_assignments_);
 
   // Holds scan ranges that must be assigned for remote reads.
@@ -628,10 +472,10 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
 
     // Select executor for the current scan range.
     if (exec_at_coord) {
-      DCHECK(assignment_ctx.executor_config().LookUpBackendIp(
-          local_backend_descriptor_.address.hostname, nullptr));
-      assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id,
-          host_list, scan_range_locations, assignment);
+      DCHECK(assignment_ctx.executor_group().LookUpExecutorIp(
+          local_be_desc.address.hostname, nullptr));
+      assignment_ctx.RecordScanRangeAssignment(local_be_desc, node_id, host_list,
+          scan_range_locations, assignment);
     } else {
       // Collect executor candidates with smallest memory distance.
       vector<IpAddr> executor_candidates;
@@ -642,7 +486,7 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
           // replica host.
           TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
           IpAddr executor_ip;
-          bool has_local_executor = assignment_ctx.executor_config().LookUpBackendIp(
+          bool has_local_executor = assignment_ctx.executor_group().LookUpExecutorIp(
               replica_host.hostname, &executor_ip);
           if (has_local_executor) {
             if (location.is_cached) {
@@ -715,7 +559,7 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
     // ranges, which allows for execution on any backend.
     if (scan_range_locations->scan_range.__isset.hdfs_file_split &&
         num_remote_executor_candidates > 0 &&
-        num_remote_executor_candidates < executor_config.NumBackends()) {
+        num_remote_executor_candidates < executor_group.NumExecutors()) {
       assignment_ctx.GetRemoteExecutorCandidates(
           &scan_range_locations->scan_range.hdfs_file_split,
           num_remote_executor_candidates, &remote_executor_candidates);
@@ -780,8 +624,8 @@ void Scheduler::FindNodes(const TPlan& plan, const vector<TPlanNodeType::type>&
   }
 }
 
-void Scheduler::GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& params,
-    vector<TNetworkAddress>* scan_hosts) {
+void Scheduler::GetScanHosts(const TBackendDescriptor& local_be_desc, TPlanNodeId scan_id,
+    const FragmentExecParams& params, vector<TNetworkAddress>* scan_hosts) {
   // Get the list of impalad host from scan_range_assignment_
   for (const FragmentScanRangeAssignment::value_type& scan_range_assignment :
       params.scan_range_assignment) {
@@ -796,18 +640,36 @@ void Scheduler::GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& para
     // TODO: we'll need to revisit this strategy once we can partition joins
     // (in which case this fragment might be executing a right outer join
     // with a large build table)
-    scan_hosts->push_back(local_backend_descriptor_.address);
+    scan_hosts->push_back(local_be_desc.address);
     return;
   }
 }
 
 Status Scheduler::Schedule(QuerySchedule* schedule) {
-  // Make a copy of the executor_config upfront to avoid using inconsistent views
-  // between ComputeScanRangeAssignment() and ComputeFragmentExecParams().
-  ExecutorsConfigPtr config_ptr = GetExecutorsConfig();
-  RETURN_IF_ERROR(ComputeScanRangeAssignment(*config_ptr, schedule));
-  ComputeFragmentExecParams(*config_ptr, schedule);
-  ComputeBackendExecParams(*config_ptr, schedule);
+  // Use a snapshot of the cluster membership state upfront to avoid using inconsistent
+  // views throughout scheduling.
+  ClusterMembershipMgr::SnapshotPtr membership_snapshot =
+      cluster_membership_mgr_->GetSnapshot();
+  if (membership_snapshot->local_be_desc.get() == nullptr) {
+    // This can happen in the short time period after the ImpalaServer has finished
+    // starting up (which makes the local backend available) and the next statestore
+    // update that pulls the local backend descriptor into the membership snapshot.
+    return Status("Local backend has not been registered in the cluster membership");
+  }
+  const string& group_name = ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP;
+  VLOG_QUERY << "Scheduling query " << PrintId(schedule->query_id())
+      << " on executor group: " << group_name;
+
+  auto it = membership_snapshot->executor_groups.find(group_name);
+  if (it == membership_snapshot->executor_groups.end()) {
+    return Status(Substitute("Unknown executor group: $0", group_name));
+  }
+  const ExecutorGroup& executor_group = it->second;
+  ExecutorConfig executor_config =
+      {executor_group, *membership_snapshot->local_be_desc};
+  RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, schedule));
+  ComputeFragmentExecParams(executor_config, schedule);
+  ComputeBackendExecParams(executor_config, schedule);
 #ifndef NDEBUG
   schedule->Validate();
 #endif
@@ -825,7 +687,7 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
 }
 
 void Scheduler::ComputeBackendExecParams(
-    const BackendConfig& executor_config, QuerySchedule* schedule) {
+    const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   PerBackendExecParams per_backend_params;
   for (const FragmentExecParams& f : schedule->fragment_exec_params()) {
     for (const FInstanceExecParams& i : f.instance_exec_params) {
@@ -871,14 +733,14 @@ void Scheduler::ComputeBackendExecParams(
       num_fragment_instances_ss.str());
 }
 
-Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config,
+Scheduler::AssignmentCtx::AssignmentCtx(const ExecutorGroup& executor_group,
     IntCounter* total_assignments, IntCounter* total_local_assignments)
-  : executors_config_(executor_config),
+  : executor_group_(executor_group),
     first_unused_executor_idx_(0),
     total_assignments_(total_assignments),
     total_local_assignments_(total_local_assignments) {
-  DCHECK_GT(executor_config.NumBackends(), 0);
-  executor_config.GetAllBackendIps(&random_executor_order_);
+  DCHECK_GT(executor_group.NumExecutors(), 0);
+  random_executor_order_ = executor_group.GetAllExecutorIps();
   std::mt19937 g(rand());
   std::shuffle(random_executor_order_.begin(), random_executor_order_.end(), g);
   // Initialize inverted map for executor rank lookups
@@ -925,7 +787,7 @@ void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
   DCHECK_EQ(remote_executor_candidates->size(), 0);
   // This function should not be used when 'num_candidates' exceeds the number
   // of executors.
-  DCHECK_LT(num_candidates, executors_config_.NumBackends());
+  DCHECK_LT(num_candidates, executor_group_.NumExecutors());
   // Two different hashes of the filename can result in the same executor.
   // The function should return distinct executors, so it may need to do more hashes
   // than 'num_candidates'.
@@ -941,7 +803,7 @@ void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
   int max_iterations = num_candidates * 3;
   for (int i = 0; i < max_iterations; ++i) {
     // Look up nearest IpAddr
-    const IpAddr* executor_addr = executors_config_.GetHashRing()->GetNode(prng());
+    const IpAddr* executor_addr = executor_group_.GetHashRing()->GetNode(prng());
     DCHECK(executor_addr != nullptr);
     distinct_backends.insert(*executor_addr);
     // Short-circuit if we reach the appropriate number of replicas
@@ -960,8 +822,8 @@ const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() {
   } else {
     // Pick next executor from assignment_heap. All executors must have been inserted into
     // the heap at this point.
-    DCHECK_GT(executors_config_.NumBackends(), 0);
-    DCHECK_EQ(executors_config_.NumBackends(), assignment_heap_.size());
+    DCHECK_GT(executor_group_.NumExecutors(), 0);
+    DCHECK_EQ(executor_group_.NumExecutors(), assignment_heap_.size());
     candidate_ip = &(assignment_heap_.top().ip);
   }
   DCHECK(candidate_ip != nullptr);
@@ -986,18 +848,24 @@ int Scheduler::AssignmentCtx::GetExecutorRank(const IpAddr& ip) const {
 
 void Scheduler::AssignmentCtx::SelectExecutorOnHost(
     const IpAddr& executor_ip, TBackendDescriptor* executor) {
-  DCHECK(executors_config_.LookUpBackendIp(executor_ip, nullptr));
-  const BackendConfig::BackendList& executors_on_host =
-      executors_config_.GetBackendListForHost(executor_ip);
+  DCHECK(executor_group_.LookUpExecutorIp(executor_ip, nullptr));
+  const ExecutorGroup::Executors& executors_on_host =
+      executor_group_.GetExecutorsForHost(executor_ip);
   DCHECK(executors_on_host.size() > 0);
   if (executors_on_host.size() == 1) {
     *executor = *executors_on_host.begin();
   } else {
-    BackendConfig::BackendList::const_iterator* next_executor_on_host;
+    ExecutorGroup::Executors::const_iterator* next_executor_on_host;
     next_executor_on_host =
         FindOrInsert(&next_executor_per_host_, executor_ip, executors_on_host.begin());
-    DCHECK(find(executors_on_host.begin(), executors_on_host.end(),
-        **next_executor_on_host) != executors_on_host.end());
+    auto eq = [next_executor_on_host](auto& elem) {
+      const TBackendDescriptor& next_executor = **next_executor_on_host;
+      // The IP addresses must already match, so it is sufficient to check the port.
+      DCHECK_EQ(next_executor.ip_address, elem.ip_address);
+      return next_executor.address.port == elem.address.port;
+    };
+    DCHECK(find_if(executors_on_host.begin(), executors_on_host.end(), eq)
+        != executors_on_host.end());
     *executor = **next_executor_on_host;
     // Rotate
     ++(*next_executor_on_host);
@@ -1022,7 +890,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   }
 
   IpAddr executor_ip;
-  bool ret = executors_config_.LookUpBackendIp(executor.address.hostname, &executor_ip);
+  bool ret = executor_group_.LookUpExecutorIp(executor.address.hostname, &executor_ip);
   DCHECK(ret);
   DCHECK(!executor_ip.empty());
   assignment_heap_.InsertOrUpdate(
@@ -1038,7 +906,7 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
   for (const TScanRangeLocation& location : scan_range_locations.locations) {
     const TNetworkAddress& replica_host = host_list[location.host_idx];
     IpAddr replica_ip;
-    if (executors_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
+    if (executor_group_.LookUpExecutorIp(replica_host.hostname, &replica_ip)
         && executor_ip == replica_ip) {
       remote_read = false;
       volume_id = location.volume_id;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 940b1c2..ebabeb9 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -31,36 +31,28 @@
 #include "gen-cpp/CatalogObjects_generated.h"
 #include "gen-cpp/PlanNodes_types.h"
 #include "gen-cpp/StatestoreService_types.h"
-#include "gen-cpp/Types_types.h" // for TNetworkAddress
 #include "rapidjson/document.h"
 #include "rpc/thrift-util.h"
-#include "scheduling/backend-config.h"
+#include "scheduling/executor-group.h"
 #include "scheduling/query-schedule.h"
 #include "scheduling/request-pool-service.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/runtime-profile.h"
-#include "util/webserver.h"
 
 namespace impala {
+class ClusterMembershipMgr;
 
 namespace test {
 class SchedulerWrapper;
 }
 
-/// Performs simple scheduling by matching between a list of executor backends configured
-/// either from the statestore, or from a static list of addresses, and a list
-/// of target data locations. The current set of executors is stored in executors_config_.
-/// When receiving changes to the executor configuration from the statestore we will make
-/// a copy of this configuration, apply the updates to the copy and atomically swap the
-/// contents of the executors_config_ pointer.
+/// Performs simple scheduling by matching between a list of executor backends that it
+/// retrieves from the cluster membership manager, and a list of target data locations.
 ///
-/// TODO: Notice when there are duplicate statestore registrations (IMPALA-23)
 /// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query
 ///       instead of per plan node?
-/// TODO: Replace the usage of shared_ptr with atomic_shared_ptr once compilers support
-///       it. Alternatively consider using Kudu's rw locks.
 /// TODO: Inject global dependencies into the class (for example ExecEnv::GetInstance(),
 ///       RNG used during scheduling, FLAGS_*)
 ///       to make it testable.
@@ -69,51 +61,25 @@ class SchedulerWrapper;
 ///           configuration.
 class Scheduler {
  public:
-  /// List of server descriptors.
-  typedef std::vector<TBackendDescriptor> BackendList;
-
-  /// Initialize with a subscription manager that we can register with for updates to the
-  /// set of available backends.
-  ///  - backend_id - unique identifier for this Impala backend (usually a host:port)
-  Scheduler(StatestoreSubscriber* subscriber, const std::string& backend_id,
-      MetricGroup* metrics, Webserver* webserver,
+  Scheduler(ClusterMembershipMgr* cluster_membership_mgr, MetricGroup* metrics,
       RequestPoolService* request_pool_service);
 
-  /// Initializes the scheduler, acquiring all resources needed to make scheduling
-  /// decisions once this method returns. Register with the subscription manager if
-  /// required. Also initializes the local backend descriptor. Returns error status
-  /// on failure. 'backend_address' is the address of thrift based ImpalaInternalService
-  /// of this backend. If FLAGS_use_krpc is true, 'krpc_address' contains IP-address:port
-  /// on which KRPC based ImpalaInternalService is exported. 'ip' is the resolved
-  /// IP address of this backend. 'admit_mem_limit' is the ExecEnv::admit_mem_limit()
-  /// value or a dummy value provided by scheduler tests.
-  Status Init(const TNetworkAddress& backend_address, const TNetworkAddress& krpc_address,
-      const IpAddr& ip, int64_t admit_mem_limit);
-
-  /// Test helper that updates the local backend address to reflect whatever
-  /// ephemeral port was assigned during server startup. Should only be called
-  /// from backend tests. Not safe to call concurrently while queries are being
-  /// scheduled.
-  void UpdateLocalBackendAddrForBeTest();
-
   /// Populates given query schedule and assigns fragments to hosts based on scan
   /// ranges in the query exec request.
   Status Schedule(QuerySchedule* schedule);
 
-  /// Build a backend descriptor for this Impala daemon. Fills out all metadata using
-  /// the provided arguments, except does not set 'is_quiescing'.
-  static TBackendDescriptor BuildLocalBackendDescriptor(Webserver* webserver,
-      const TNetworkAddress& backend_address, const TNetworkAddress& krpc_address,
-      const IpAddr& ip, int64_t admit_mem_limit);
-
  private:
+  /// Current snapshot of executors to be used for scheduling a scan.
+  struct ExecutorConfig {
+    const ExecutorGroup& group;
+    const TBackendDescriptor& local_be_desc;
+  };
+
   /// Map from a host's IP address to the next executor to be round-robin scheduled for
   /// that host (needed for setups with multiple executors on a single host)
-  typedef boost::unordered_map<IpAddr, BackendConfig::BackendList::const_iterator>
+  typedef boost::unordered_map<IpAddr, ExecutorGroup::Executors::const_iterator>
       NextExecutorPerHost;
 
-  typedef std::shared_ptr<const BackendConfig> ExecutorsConfigPtr;
-
   /// Internal structure to track scan range assignments for an executor host. This struct
   /// is used as the heap element in and maintained by AddressableAssignmentHeap.
   struct ExecutorAssignmentInfo {
@@ -175,14 +141,14 @@ class Scheduler {
   };
 
   /// Class to store context information on assignments during scheduling. It is
-  /// initialized with a copy of the global executor information and assigns a random rank
-  /// to each executor to break ties in cases where multiple executors have been assigned
-  /// the same number or bytes. It tracks the number of assigned bytes, which executors
-  /// have already been used, etc. Objects of this class are created in
+  /// initialized with a copy of the executor group and assigns a random rank to each
+  /// executor to break ties in cases where multiple executors have been assigned the same
+  /// number or bytes. It tracks the number of assigned bytes, which executors have
+  /// already been used, etc. Objects of this class are created in
   /// ComputeScanRangeAssignment() and thus don't need to be thread safe.
   class AssignmentCtx {
    public:
-    AssignmentCtx(const BackendConfig& executor_config, IntCounter* total_assignments,
+    AssignmentCtx(const ExecutorGroup& executor_group, IntCounter* total_assignments,
         IntCounter* total_local_assignments);
 
     /// Among hosts in 'data_locations', select the one with the minimum number of
@@ -196,7 +162,7 @@ class Scheduler {
     /// Populate 'remote_executor_candidates' with 'num_candidates' distinct
     /// executors. The algorithm for picking remote executor candidates is to hash
     /// the file name / offset from 'hdfs_file_split' multiple times and look up the
-    /// closest executors stored in the BackendConfig's HashRing. Given the same file
+    /// closest executors stored in the ExecutorGroup's HashRing. Given the same file
     /// name / offset and same set of executors, this function is deterministic. The hash
     /// ring also limits the disruption when executors are added or removed. Note that
     /// 'num_candidates' cannot be 0 and must be less than the total number of executors.
@@ -227,7 +193,7 @@ class Scheduler {
         const TScanRangeLocationList& scan_range_locations,
         FragmentScanRangeAssignment* assignment);
 
-    const BackendConfig& executor_config() const { return executors_config_; }
+    const ExecutorGroup& executor_group() const { return executor_group_; }
 
     /// Print the assignment and statistics to VLOG_FILE.
     void PrintAssignment(const FragmentScanRangeAssignment& assignment);
@@ -241,7 +207,7 @@ class Scheduler {
     };
 
     /// Used to look up hostnames to IP addresses and IP addresses to executors.
-    const BackendConfig& executors_config_;
+    const ExecutorGroup& executor_group_;
 
     // Addressable heap to select remote executors from. Elements are ordered by the
     // number of already assigned bytes (and a random rank to break ties).
@@ -276,52 +242,17 @@ class Scheduler {
     int GetExecutorRank(const IpAddr& ip) const;
   };
 
-  /// The scheduler's executors configuration. When receiving changes to the executors
-  /// configuration from the statestore we will make a copy of the stored object, apply
-  /// the updates to the copy and atomically swap the contents of this pointer. Each plan
-  /// node creates a read-only copy of the scheduler's current executors_config_ to use
-  /// during scheduling.
-  ExecutorsConfigPtr executors_config_;
-
-  /// A backend configuration which only contains the local backend. It is used when
-  /// scheduling on the coordinator.
-  BackendConfig coord_only_backend_config_;
-
-  /// Protect access to executors_config_ which might otherwise be updated asynchronously
-  /// with respect to reads.
-  mutable boost::mutex executors_config_lock_;
-
   /// Total number of scan ranges assigned to executors during the lifetime of the
   /// scheduler.
   int64_t num_assignments_;
 
-  /// Map from unique backend ID to TBackendDescriptor. The
-  /// {backend ID, TBackendDescriptor} pairs represent the IMPALA_MEMBERSHIP_TOPIC
-  /// {key, value} pairs of known executors retrieved from the statestore. It's important
-  /// to track both the backend ID as well as the TBackendDescriptor so we know what is
-  /// being removed in a given update. Locking of this map is not needed since it should
-  /// only be read/modified from within the UpdateMembership() function.
-  typedef boost::unordered_map<std::string, TBackendDescriptor> BackendIdMap;
-  BackendIdMap current_executors_;
-
   /// MetricGroup subsystem access
   MetricGroup* metrics_;
 
-  /// Webserver for /backends. Not owned by us.
-  Webserver* webserver_;
-
-  /// Pointer to a subscription manager (which we do not own) which is used to register
-  /// for dynamic updates to the set of available backends. May be NULL if the set of
-  /// backends is fixed.
-  StatestoreSubscriber* statestore_subscriber_;
-
-  /// Unique - across the cluster - identifier for this impala backend.
-  const std::string local_backend_id_;
-
-  /// Describe this backend, including the Impalad service address.
-  TBackendDescriptor local_backend_descriptor_;
-
-  ThriftSerializer thrift_serializer_;
+  /// Pointer to the cluster membership manager. It provides information about backends
+  /// and executors in the cluster that the scheduler uses to assign fragment instances to
+  /// backends.
+  ClusterMembershipMgr* cluster_membership_mgr_;
 
   /// Locality metrics
   IntCounter* total_assignments_ = nullptr;
@@ -330,27 +261,15 @@ class Scheduler {
   /// Initialization metric
   BooleanProperty* initialized_ = nullptr;
 
-  /// Current number of executors
-  IntGauge* num_fragment_instances_metric_ = nullptr;
-
   /// Used for user-to-pool resolution and looking up pool configurations. Not owned by
   /// us.
   RequestPoolService* request_pool_service_;
 
-  /// Helper methods to access executors_config_ (the shared_ptr, not its contents),
-  /// protecting the access with executors_config_lock_.
-  ExecutorsConfigPtr GetExecutorsConfig() const;
-  void SetExecutorsConfig(const ExecutorsConfigPtr& executors_config);
-
   /// Returns the backend descriptor corresponding to 'host' which could be a remote
   /// backend or the local host itself. The returned descriptor should not be retained
   /// beyond the lifetime of 'executor_config'.
   const TBackendDescriptor& LookUpBackendDesc(
-      const BackendConfig& executor_config, const TNetworkAddress& host);
-
-  /// Called asynchronously when an update is received from the subscription manager
-  void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
-      std::vector<TTopicDelta>* subscriber_topic_updates);
+      const ExecutorConfig& executor_config, const TNetworkAddress& host);
 
   /// Determine the pool for a user and query options via request_pool_service_.
   Status GetRequestPool(const std::string& user, const TQueryOptions& query_options,
@@ -366,7 +285,7 @@ class Scheduler {
   /// fragment_exec_params_ with the resulting scan range assignment.
   /// We have a benchmark for this method in be/src/benchmarks/scheduler-benchmark.cc.
   /// 'executor_config' is the executor configuration to use for scheduling.
-  Status ComputeScanRangeAssignment(const BackendConfig& executor_config,
+  Status ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
       QuerySchedule* schedule);
 
   /// Process the list of scan ranges of a single plan node and compute scan range
@@ -414,7 +333,7 @@ class Scheduler {
   ///
   /// The method takes the following parameters:
   ///
-  /// executor_config:         Executor configuration to use for scheduling.
+  /// executor_config:          Executor configuration to use for scheduling.
   /// node_id:                 ID of the plan node.
   /// node_replica_preference: Query hint equivalent to replica_preference.
   /// node_random_replica:     Query hint equivalent to schedule_random_replica.
@@ -424,7 +343,7 @@ class Scheduler {
   /// query_options:           Query options for the current query.
   /// timer:                   Tracks execution time of ComputeScanRangeAssignment.
   /// assignment:              Output parameter, to which new assignments will be added.
-  Status ComputeScanRangeAssignment(const BackendConfig& executor_config,
+  Status ComputeScanRangeAssignment(const ExecutorConfig& executor_config,
       PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
       bool node_random_replica, const std::vector<TScanRangeLocationList>& locations,
       const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
@@ -434,20 +353,20 @@ class Scheduler {
   /// Computes BackendExecParams for all backends assigned in the query. Must be called
   /// after ComputeFragmentExecParams().
   void ComputeBackendExecParams(
-      const BackendConfig& executor_config, QuerySchedule* schedule);
+      const ExecutorConfig& executor_config, QuerySchedule* schedule);
 
   /// Compute the FragmentExecParams for all plans in the schedule's
   /// TQueryExecRequest.plan_exec_info.
   /// This includes the routing information (destinations, per_exch_num_senders,
   /// sender_id)
   /// 'executor_config' is the executor configuration to use for scheduling.
-  void ComputeFragmentExecParams(const BackendConfig& executor_config,
+  void ComputeFragmentExecParams(const ExecutorConfig& executor_config,
       QuerySchedule* schedule);
 
   /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
   /// fragment_params and its input fragments via a depth-first traversal.
   /// All fragments are part of plan_exec_info.
-  void ComputeFragmentExecParams(const BackendConfig& executor_config,
+  void ComputeFragmentExecParams(const ExecutorConfig& executor_config,
       const TPlanExecInfo& plan_exec_info, FragmentExecParams* fragment_params,
       QuerySchedule* schedule);
 
@@ -460,7 +379,7 @@ class Scheduler {
   /// a UnionNode with partitioned joins or grouping aggregates as children runs on
   /// at least as many hosts as the input to those children).
   /// TODO: is this really necessary? If not, revise.
-  void CreateUnionInstances(const BackendConfig& executor_config,
+  void CreateUnionInstances(const ExecutorConfig& executor_config,
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// Create instances of the fragment corresponding to fragment_params to run on the
@@ -470,7 +389,7 @@ class Scheduler {
   /// number of bytes per instances and then in a single pass assigning scan ranges to
   /// each instance to roughly meet that average.
   /// For all other storage mgrs, it load-balances the number of splits per instance.
-  void CreateScanInstances(const BackendConfig& executor_config, PlanNodeId scan_id,
+  void CreateScanInstances(const ExecutorConfig& executor_config, PlanNodeId scan_id,
       FragmentExecParams* fragment_params, QuerySchedule* schedule);
 
   /// For each instance of fragment_params's input fragment, create a collocated
@@ -487,8 +406,8 @@ class Scheduler {
   PlanNodeId FindLeftmostScan(const TPlan& plan);
 
   /// Add all hosts the given scan is executed on to scan_hosts.
-  void GetScanHosts(TPlanNodeId scan_id, const FragmentExecParams& params,
-      std::vector<TNetworkAddress>* scan_hosts);
+  void GetScanHosts(const TBackendDescriptor& local_be_desc, TPlanNodeId scan_id,
+      const FragmentExecParams& params, std::vector<TNetworkAddress>* scan_hosts);
 
   /// Return true if 'plan' contains a node of the given type.
   bool ContainsNode(const TPlan& plan, TPlanNodeType::type type);
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 3cfd65a..e9941da 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -33,6 +33,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "scheduling/admission-controller.h"
+#include "scheduling/cluster-membership-mgr.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
@@ -866,7 +867,13 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::WebRequest& req,
   ExecEnv::GetInstance()->admission_controller()->PopulatePerHostMemReservedAndAdmitted(
       &host_mem_map);
   Value backends_list(kArrayType);
-  for (const auto& entry : server_->GetKnownBackends()) {
+  ClusterMembershipMgr* cluster_membership_mgr =
+      ExecEnv::GetInstance()->cluster_membership_mgr();
+  DCHECK(cluster_membership_mgr != nullptr);
+  ClusterMembershipMgr::SnapshotPtr membership_snapshot =
+      cluster_membership_mgr->GetSnapshot();
+  DCHECK(membership_snapshot.get() != nullptr);
+  for (const auto& entry : membership_snapshot->current_backends) {
     TBackendDescriptor backend = entry.second;
     Value backend_obj(kObjectType);
     string address = TNetworkAddressToString(backend.address);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 80416ab..ac014e2 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -61,7 +61,6 @@
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
 #include "scheduling/admission-controller.h"
-#include "scheduling/scheduler.h"
 #include "service/cancellation-work.h"
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
@@ -69,7 +68,6 @@
 #include "service/frontend.h"
 #include "util/auth-util.h"
 #include "util/bit-util.h"
-#include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
 #include "util/histogram-metric.h"
@@ -246,13 +244,6 @@ DEFINE_int64(shutdown_deadline_s, 60 * 60, "Default time limit in seconds for th
     "down process. If this duration elapses after the shut down process is started, "
     "the daemon shuts down regardless of any running queries.");
 
-DEFINE_int64_hidden(failed_backends_query_cancellation_grace_period_ms, 30000L,
-    "Grace period since last successful subscriber registration that impala server is "
-    "willing to wait before initiating cancellation of queries running on backends not "
-    "included in the latest membership update. This value should be large enough to give "
-    "the statestore enough time to get a consistent view of cluster membership after "
-    "recovery.");
-
 #ifndef NDEBUG
   DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates metadata loading"
       "for a given query by injecting a sleep equivalent to this configuration in "
@@ -305,7 +296,6 @@ ThreadSafeRandom ImpalaServer::rng_(GetRandomSeed32());
 
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     : exec_env_(exec_env),
-      thrift_serializer_(false),
       services_started_(false) {
   // Initialize default config
   InitializeConfigVariables();
@@ -377,33 +367,22 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
 
   ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
 
-  // Register the membership callback if running in a real cluster.
-  if (!TestInfo::is_test()) {
-    auto cb = [this](const StatestoreSubscriber::TopicDeltaMap& state,
+  // Register the catalog update callback if running in a real cluster as a coordinator.
+  if (!TestInfo::is_test() && FLAGS_is_coordinator) {
+    auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
         vector<TTopicDelta>* topic_updates) {
-      this->MembershipCallback(state, topic_updates);
+      this->CatalogUpdateCallback(state, topic_updates);
     };
-    ABORT_IF_ERROR(exec_env_->subscriber()->AddTopic(
-        Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
-        /* populate_min_subscriber_topic_version=*/ false,
-        /* filter_prefix=*/"", cb));
-
-    if (FLAGS_is_coordinator) {
-      auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
-          vector<TTopicDelta>* topic_updates) {
-        this->CatalogUpdateCallback(state, topic_updates);
-      };
-      // The 'local-catalog' implementation only needs minimal metadata to
-      // trigger cache invalidations.
-      // The legacy implementation needs full metadata objects.
-      string filter_prefix = FLAGS_use_local_catalog ?
+    // The 'local-catalog' implementation only needs minimal metadata to
+    // trigger cache invalidations.
+    // The legacy implementation needs full metadata objects.
+    string filter_prefix = FLAGS_use_local_catalog ?
         g_CatalogService_constants.CATALOG_TOPIC_V2_PREFIX :
         g_CatalogService_constants.CATALOG_TOPIC_V1_PREFIX;
-      ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
-          CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
-          /* populate_min_subscriber_topic_version=*/ true,
-          filter_prefix, catalog_cb));
-    }
+    ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
+        CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
+        /* populate_min_subscriber_topic_version=*/ true,
+        filter_prefix, catalog_cb));
   }
 
   ABORT_IF_ERROR(UpdateCatalogMetrics());
@@ -562,11 +541,6 @@ int ImpalaServer::GetHS2Port() {
   return hs2_server_->port();
 }
 
-const ImpalaServer::BackendDescriptorMap ImpalaServer::GetKnownBackends() {
-  lock_guard<mutex> l(known_backends_lock_);
-  return known_backends_;
-}
-
 bool ImpalaServer::IsLineageLoggingEnabled() {
   return !FLAGS_lineage_event_log_dir.empty();
 }
@@ -1797,88 +1771,8 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
   return Status::OK();
 }
 
-void ImpalaServer::MembershipCallback(
-    const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
-    vector<TTopicDelta>* subscriber_topic_updates) {
-  // TODO: Consider rate-limiting this. In the short term, best to have
-  // statestore heartbeat less frequently.
-  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
-  if (topic == incoming_topic_deltas.end()) return;
-
-  const TTopicDelta& delta = topic->second;
-  // Create a set of known backend network addresses. Used to test for cluster
-  // membership by network address.
-  set<TNetworkAddress> current_membership;
-  {
-    lock_guard<mutex> l(known_backends_lock_);
-    // If this is not a delta, the update should include all entries in the topic so
-    // clear the saved mapping of known backends.
-    if (!delta.is_delta) known_backends_.clear();
-
-    // Process membership additions/deletions.
-    for (const TTopicItem& item : delta.topic_entries) {
-      if (item.deleted) {
-        auto entry = known_backends_.find(item.key);
-        // Remove stale connections to removed members.
-        if (entry != known_backends_.end()) {
-          exec_env_->impalad_client_cache()->CloseConnections(entry->second.address);
-          known_backends_.erase(item.key);
-        }
-        continue;
-      }
-      uint32_t len = item.value.size();
-      TBackendDescriptor backend_descriptor;
-      Status status =
-          DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()),
-              &len, false, &backend_descriptor);
-      if (!status.ok()) {
-        VLOG(2) << "Error deserializing topic item with key: " << item.key;
-        continue;
-      }
-      // This is a new or modified item - add it to the map of known backends.
-      auto it = known_backends_.find(item.key);
-      if (it != known_backends_.end()) {
-        it->second = backend_descriptor;
-      } else {
-        known_backends_.emplace_hint(it, item.key, backend_descriptor);
-      }
-    }
-
-    // Register the local backend in the statestore and update the list of known
-    // backends. Only register if all ports have been opened and are ready.
-    if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates);
-
-    // Also reflect changes to the frontend. Initialized only if any_changes is true.
-    // Only send the hostname and ip_address of the executors to the frontend.
-    TUpdateExecutorMembershipRequest update_req;
-    bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
-    for (const BackendDescriptorMap::value_type& backend : known_backends_) {
-      current_membership.insert(backend.second.address);
-      if (any_changes && backend.second.is_executor) {
-        update_req.hostnames.insert(backend.second.address.hostname);
-        update_req.ip_addresses.insert(backend.second.ip_address);
-        update_req.num_executors++;
-      }
-    }
-    if (any_changes) {
-      Status status = exec_env_->frontend()->UpdateExecutorMembership(update_req);
-      if (!status.ok()) {
-        LOG(WARNING) << "Error updating frontend membership snapshot: "
-                     << status.GetDetail();
-      }
-    }
-  }
-
-  // Only initiate cancellation after a grace period since last successful registration.
-  if (exec_env_->subscriber()->MilliSecondsSinceLastRegistration()
-      >= FLAGS_failed_backends_query_cancellation_grace_period_ms) {
-    CancelQueriesOnFailedBackends(current_membership);
-  }
-}
-
 void ImpalaServer::CancelQueriesOnFailedBackends(
-    const set<TNetworkAddress>& current_membership) {
+    const std::unordered_set<TNetworkAddress>& current_membership) {
   // Maps from query id (to be cancelled) to a list of failed Impalads that are
   // the cause of the cancellation.
   map<TUniqueId, vector<TNetworkAddress>> queries_to_cancel;
@@ -1935,42 +1829,56 @@ void ImpalaServer::CancelQueriesOnFailedBackends(
   }
 }
 
-void ImpalaServer::AddLocalBackendToStatestore(
-    vector<TTopicDelta>* subscriber_topic_updates) {
-  const string& local_backend_id = exec_env_->subscriber()->id();
-  bool is_quiescing = shutting_down_.Load() != 0;
-  auto it = known_backends_.find(local_backend_id);
-  // 'is_quiescing' can change during the lifetime of the Impalad - make sure that the
-  // membership reflects the current value.
-  if (it != known_backends_.end()
-      && is_quiescing == it->second.is_quiescing) {
-    return;
+std::shared_ptr<const TBackendDescriptor> ImpalaServer::GetLocalBackendDescriptor() {
+  if (!services_started_.load()) return nullptr;
+
+  lock_guard<mutex> l(local_backend_descriptor_lock_);
+  // Check if the current backend descriptor needs to be initialized.
+  if (local_backend_descriptor_.get() == nullptr) {
+    std::shared_ptr<TBackendDescriptor> new_be_desc =
+        std::make_shared<TBackendDescriptor>();
+    BuildLocalBackendDescriptorInternal(new_be_desc.get());
+    local_backend_descriptor_ = new_be_desc;
   }
 
-  TBackendDescriptor local_backend_descriptor =
-      Scheduler::BuildLocalBackendDescriptor(exec_env_->webserver(),
-          exec_env_->GetThriftBackendAddress(), exec_env_->krpc_address(),
-          exec_env_->ip_address(), exec_env_->admit_mem_limit());
-  local_backend_descriptor.__set_is_quiescing(is_quiescing);
+  // Check to see if it needs to be updated.
+  if (IsShuttingDown() != local_backend_descriptor_->is_quiescing) {
+    std::shared_ptr<TBackendDescriptor> new_be_desc =
+      std::make_shared<TBackendDescriptor>(*local_backend_descriptor_);
+    new_be_desc->is_quiescing = IsShuttingDown();
+    local_backend_descriptor_ = new_be_desc;
+  }
 
-  subscriber_topic_updates->emplace_back(TTopicDelta());
-  TTopicDelta& update = subscriber_topic_updates->back();
-  update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
-  update.topic_entries.emplace_back(TTopicItem());
+  return local_backend_descriptor_;
+}
 
-  TTopicItem& item = update.topic_entries.back();
-  item.key = local_backend_id;
-  Status status = thrift_serializer_.SerializeToString(&local_backend_descriptor,
-      &item.value);
-  if (!status.ok()) {
-    LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:"
-                 << " " << status.GetDetail();
-    subscriber_topic_updates->pop_back();
-  } else if (it != known_backends_.end()) {
-    it->second.is_quiescing = is_quiescing;
-  } else {
-    known_backends_.insert(make_pair(item.key, local_backend_descriptor));
+void ImpalaServer::BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_desc) {
+  DCHECK(services_started_.load());
+  bool is_quiescing = shutting_down_.Load() != 0;
+
+  be_desc->__set_address(exec_env_->GetThriftBackendAddress());
+  be_desc->__set_ip_address(exec_env_->ip_address());
+  be_desc->__set_is_coordinator(FLAGS_is_coordinator);
+  be_desc->__set_is_executor(FLAGS_is_executor);
+
+  Webserver* webserver = ExecEnv::GetInstance()->webserver();
+  if (webserver != nullptr) {
+    const TNetworkAddress& webserver_address = webserver->http_address();
+    if (IsWildcardAddress(webserver_address.hostname)) {
+      be_desc->__set_debug_http_address(
+          MakeNetworkAddress(be_desc->ip_address, webserver_address.port));
+    } else {
+      be_desc->__set_debug_http_address(webserver_address);
+    }
+    be_desc->__set_secure_webserver(webserver->IsSecure());
   }
+
+  const TNetworkAddress& krpc_address = exec_env_->krpc_address();
+  DCHECK(IsResolvedAddress(krpc_address));
+  be_desc->__set_krpc_address(krpc_address);
+
+  be_desc->__set_admit_mem_limit(exec_env_->admit_mem_limit());
+  be_desc->__set_is_quiescing(is_quiescing);
 }
 
 ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& request_state,
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8be9cb6..8f6b7e6 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -37,6 +37,7 @@
 #include "common/status.h"
 #include "service/query-options.h"
 #include "util/condition-variable.h"
+#include "util/container-util.h"
 #include "util/metrics.h"
 #include "util/runtime-profile.h"
 #include "util/sharded-query-map-util.h"
@@ -87,7 +88,10 @@ class QuerySchedule;
 ///
 /// Main thread (caller code), after instantiating the server, must call Start().
 /// Start() does the following:
-///    - Registers the ImpalaServer instance with the ExecEnv
+///    - Registers the ImpalaServer instance with the ExecEnv. This also registers it with
+///      the ClusterMembershipMgr, which will register it with the statestore as soon as
+///      the local backend becomes available through GetLocalBackendDescriptor(), which is
+///      in turn gated by the services_started_.
 ///    - Start internal services
 ///    - Wait (indefinitely) for local catalog to be initialized from statestore
 ///      (if coordinator)
@@ -97,8 +101,6 @@ class QuerySchedule;
 ///    - Start client service API's (if coordinator)
 ///    - Set services_started_ flag
 ///
-/// Internally, the Membership callback thread also participates in startup:
-///    - If services_started_, then register to the statestore as an executor.
 ///
 /// Shutdown
 /// --------
@@ -111,8 +113,9 @@ class QuerySchedule;
 /// 2. The startup grace period starts, during which:
 ///   - no new client requests are accepted. Clients can still interact with registered
 ///     requests and sessions as normal.
-///   - the Impala daemon is marked in the statestore as quiescing, so coordinators
-///     will not schedule new fragments on it (once the statestore update propagates).
+///   - the local backend of the Impala daemon is marked in the statestore as quiescing,
+///     so coordinators will not schedule new fragments on it (once the statestore update
+///     propagates through the ClusterMembershipMgr).
 ///   - the Impala daemon continues to start executing any new fragments sent to it by
 ///     coordinators. This is required because the query may have been submitted before
 ///     the coordinator learned that the executor was quiescing. Delays occur for several
@@ -156,7 +159,6 @@ class QuerySchedule;
 /// * uuid_lock_
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
-/// * known_backends_lock_
 ///
 /// TODO: The same doesn't apply to the execution state of an individual plan
 /// fragment: the originating coordinator might die, but we can get notified of
@@ -303,8 +305,7 @@ class ImpalaServer : public ImpalaServiceIf,
       const apache::hive::service::cli::thrift::TRenewDelegationTokenReq& req);
 
   /// ImpalaInternalService rpcs
-  void UpdateFilter(TUpdateFilterResult& return_val,
-      const TUpdateFilterParams& params);
+  void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);
 
   /// Generates a unique id for this query and sets it in the given query context.
   /// Prepares the given query context by populating fields required for evaluating
@@ -327,15 +328,6 @@ class ImpalaServer : public ImpalaServiceIf,
   /// associated with the closed connection.
   virtual void ConnectionEnd(const ThriftServer::ConnectionContext& session_context);
 
-  /// Called when a membership update is received from the statestore. Looks for
-  /// active nodes that have failed, and cancels any queries running on them.
-  ///  - incoming_topic_deltas: all changes to registered statestore topics
-  ///  - subscriber_topic_updates: output parameter to publish any topic updates to.
-  ///                              Currently unused.
-  void MembershipCallback(
-      const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
-      std::vector<TTopicDelta>* subscriber_topic_updates);
-
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
@@ -408,8 +400,13 @@ class ImpalaServer : public ImpalaServiceIf,
   /// the server has started successfully.
   int GetHS2Port();
 
-  typedef boost::unordered_map<std::string, TBackendDescriptor> BackendDescriptorMap;
-  const BackendDescriptorMap GetKnownBackends();
+  /// Returns a current snapshot of the local backend descriptor.
+  std::shared_ptr<const TBackendDescriptor> GetLocalBackendDescriptor();
+
+  /// Takes a set of network addresses of active backends and cancels all the queries
+  /// running on failed ones (that is, addresses not in the active set).
+  void CancelQueriesOnFailedBackends(
+      const std::unordered_set<TNetworkAddress>& current_membership);
 
   /// Start the shutdown process. Return an error if it could not be started. Otherwise,
   /// if it was successfully started by this or a previous call, return OK along with
@@ -724,14 +721,8 @@ class ImpalaServer : public ImpalaServiceIf,
   Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user)
       WARN_UNUSED_RESULT;
 
-  /// Check if the local backend descriptor is in the list of known backends. If not, add
-  /// it to the list of known backends and add it to the 'topic_updates'.
-  /// 'known_backends_lock_' must be held by the caller.
-  void AddLocalBackendToStatestore(std::vector<TTopicDelta>* topic_updates);
-
-  /// Takes a set of network addresses of active backends and cancels all the queries
-  /// running on failed ones (that is, addresses not in the active set).
-  void CancelQueriesOnFailedBackends(const std::set<TNetworkAddress>& current_membership);
+  /// Initializes the backend descriptor in 'be_desc' with the local backend information.
+  void BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_desc);
 
   /// Snapshot of a query's state, archived in the query log.
   struct QueryStateRecord {
@@ -1126,21 +1117,10 @@ class ImpalaServer : public ImpalaServiceIf,
       QueryLocations;
   QueryLocations query_locations_;
 
-  /// A map from unique backend ID to the corresponding TBackendDescriptor of that
-  /// backend. Used to track membership updates from the statestore so queries can be
-  /// cancelled when a backend is removed. It's not enough to just cancel fragments that
-  /// are running based on the deletions mentioned in the most recent statestore
-  /// heartbeat; sometimes cancellations are skipped and the statestore, at its
-  /// discretion, may send only a delta of the current membership so we need to compute
-  /// any deletions.
-  /// TODO: Currently there are multiple locations where cluster membership is tracked,
-  /// here and in the scheduler. This should be consolidated so there is a single
-  /// component (the scheduler?) that tracks this information and calls other interested
-  /// components.
-  BackendDescriptorMap known_backends_;
-
-  /// Lock to protect 'known_backends_'. Not held in conjunction with other locks.
-  boost::mutex known_backends_lock_;
+  /// The local backend descriptor. Updated in GetLocalBackendDescriptor() and protected
+  /// by 'local_backend_descriptor_lock_';
+  std::shared_ptr<const TBackendDescriptor> local_backend_descriptor_;
+  boost::mutex local_backend_descriptor_lock_;
 
   /// Generate unique session id for HiveServer2 session
   boost::uuids::random_generator uuid_generator_;
@@ -1252,9 +1232,6 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
   std::unique_ptr<Thread> query_expiration_thread_;
 
-  /// Serializes TBackendDescriptors when creating topic updates
-  ThriftSerializer thrift_serializer_;
-
   /// True if this ImpalaServer can accept client connections and coordinate
   /// queries.
   bool is_coordinator_;
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 9fb4c9b..7dd6874 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -33,6 +33,7 @@
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-util.h"
 #include "statestore/statestore-service-client-wrapper.h"
+#include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/openssl-util.h"
 #include "util/time.h"
@@ -55,6 +56,10 @@ DEFINE_int32(statestore_subscriber_cnxn_attempts, 10, "The number of times to re
     "RPC connection to the statestore. A setting of 0 means retry indefinitely");
 DEFINE_int32(statestore_subscriber_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
     "to wait between attempts to make an RPC connection to the statestore.");
+DEFINE_int64_hidden(statestore_subscriber_recovery_grace_period_ms, 30000L, "Period "
+    "after the last successful subscription attempt until the subscriber will be "
+    "considered fully recovered. After a successful reconnect attempt, updates to the "
+    "cluster membership will only become effective after this period has elapsed.");
 
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
@@ -124,6 +129,8 @@ StatestoreSubscriber::StatestoreSubscriber(const string& subscriber_id,
       is_registered_(false) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
+  connection_failure_metric_ =
+    metrics_->AddCounter("statestore-subscriber.num-connection-failures", 0);
   last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
       "statestore-subscriber.last-recovery-duration", 0.0);
   last_recovery_time_metric_ = metrics_->AddProperty<string>(
@@ -265,6 +272,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
       MonotonicStopWatch recovery_timer;
       recovery_timer.Start();
       connected_to_statestore_metric_->SetValue(false);
+      connection_failure_metric_->Increment(1);
       LOG(INFO) << subscriber_id_
                 << ": Connection with statestore lost, entering recovery mode";
       uint32_t attempt_count = 1;
@@ -457,5 +465,11 @@ Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_del
   return Status::OK();
 }
 
+bool StatestoreSubscriber::IsInPostRecoveryGracePeriod() const {
+  bool has_failed_before = connection_failure_metric_->GetValue() > 0;
+  bool in_grace_period = MilliSecondsSinceLastRegistration()
+      < FLAGS_statestore_subscriber_recovery_grace_period_ms;
+  return has_failed_before && in_grace_period;
+}
 
 }
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index c187f17..233b7a7 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -26,14 +26,14 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/shared_mutex.hpp>
 
-#include "statestore/statestore.h"
-#include "util/stopwatch.h"
-#include "rpc/thrift-util.h"
+#include "gen-cpp/StatestoreService.h"
+#include "gen-cpp/StatestoreSubscriber.h"
 #include "rpc/thrift-client.h"
+#include "rpc/thrift-util.h"
+#include "statestore/statestore.h"
 #include "statestore/statestore-service-client-wrapper.h"
 #include "util/metrics.h"
-#include "gen-cpp/StatestoreService.h"
-#include "gen-cpp/StatestoreSubscriber.h"
+#include "util/stopwatch.h"
 
 namespace impala {
 
@@ -127,9 +127,9 @@ class StatestoreSubscriber {
 
   const std::string& id() const { return subscriber_id_; }
 
-  int64_t MilliSecondsSinceLastRegistration() const {
-    return MonotonicMillis() - last_registration_ms_.Load();
-  }
+  /// Returns true if the statestore has recovered and the configurable post-recovery
+  /// grace period has not yet elapsed.
+  bool IsInPostRecoveryGracePeriod() const;
 
  private:
   /// Unique, but opaque, identifier for this subscriber.
@@ -160,6 +160,9 @@ class StatestoreSubscriber {
   /// Metric to indicate if we are successfully registered with the statestore
   BooleanProperty* connected_to_statestore_metric_;
 
+  /// Metric to count the total number of connection failures to the statestore
+  IntCounter* connection_failure_metric_;
+
   /// Amount of time last spent in recovery mode
   DoubleGauge* last_recovery_duration_metric_;
 
@@ -304,6 +307,10 @@ class StatestoreSubscriber {
   /// set, an error otherwise. Used to confirm that RPCs from the statestore are intended
   /// for the current registration epoch.
   Status CheckRegistrationId(const RegistrationId& registration_id);
+
+  int64_t MilliSecondsSinceLastRegistration() const {
+    return MonotonicMillis() - last_registration_ms_.Load();
+  }
 };
 
 }
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 79fb1ba..6c93116 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -33,6 +33,7 @@
 #include "rpc/thrift-util.h"
 #include "statestore/failure-detector.h"
 #include "statestore/statestore-subscriber-client-wrapper.h"
+#include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/logging-support.h"
 #include "util/openssl-util.h"
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 0a58211..a9bdfeb 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -87,7 +87,6 @@ Status InProcessImpalaServer::StartWithClientServers(
   SetCatalogIsReady();
   RETURN_IF_ERROR(
       impala_server_->Start(backend_port_, beeswax_port, hs2_port, hs2_http_port_));
-  exec_env_->scheduler()->UpdateLocalBackendAddrForBeTest();
 
   // This flag is read directly in several places to find the address of the local
   // backend interface.
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index 41093e3..a4e2119 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -25,10 +25,71 @@
 #include <boost/functional/hash.hpp>
 
 #include "util/hash-util.h"
+
+#include "gen-cpp/ErrorCodes_types.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/StatestoreService_types.h"
+#include "gen-cpp/Status_types.h"
 #include "gen-cpp/Types_types.h"
 
+/// Comparators for types that we commonly use in containers.
 namespace impala {
 
+// This function and the following macro are used to assert that the size of the type T
+// does not change unexpectedly. This helps to ensure that the operators take all fields
+// of a struct into consideration. The benefit of this solution over a simple static
+// assert is that it includes the expected and actual struct sizes in the compile time
+// error message.
+template <typename T, int64_t Expected, int64_t Actual = sizeof(T)>
+constexpr void static_assert_size() {
+  static_assert(Expected == Actual, "Type has unexpected size");
+}
+#define STATIC_ASSERT_SIZE(type, expected) \
+  inline void static_assert_size_##type() { static_assert_size<type, expected>(); }
+
+// TUniqueId
+STATIC_ASSERT_SIZE(TUniqueId, 24);
+
+inline bool operator==(const TUniqueId& lhs, const TUniqueId& rhs) {
+  return std::tie(lhs.hi, lhs.lo) == std::tie(rhs.hi, rhs.lo);
+}
+
+inline bool operator!=(const TUniqueId& lhs, const TUniqueId& rhs) {
+  return !(lhs == rhs);
+}
+
+inline bool operator<(const TUniqueId& lhs, const TUniqueId& rhs) {
+  return std::tie(lhs.hi, lhs.lo) < std::tie(rhs.hi, rhs.lo);
+}
+
+// TNetworkAddress
+STATIC_ASSERT_SIZE(TNetworkAddress, 24);
+
+inline bool operator==(const TNetworkAddress& lhs, const TNetworkAddress& rhs) {
+  return std::tie(lhs.hostname, lhs.port) == std::tie(rhs.hostname, rhs.port);
+}
+
+inline bool operator!=(const TNetworkAddress& lhs, const TNetworkAddress& rhs) {
+  return !(lhs == rhs);
+}
+
+// TStatus
+STATIC_ASSERT_SIZE(TStatus, 48);
+
+inline bool operator==(const TStatus& lhs, const TStatus& rhs) {
+  //static_assert_size<TStatus, 48>();
+  return std::tie(lhs.status_code, lhs.error_msgs)
+      == std::tie(rhs.status_code, rhs.error_msgs);
+}
+
+// TCounter
+STATIC_ASSERT_SIZE(TCounter, 32);
+
+inline bool operator==(const TCounter& lhs, const TCounter& rhs) {
+  return std::tie(lhs.name, lhs.unit, lhs.value)
+      == std::tie(rhs.name, rhs.unit, rhs.value);
+}
+
 /// Hash function for TNetworkAddress. This function must be called hash_value to be picked
 /// up properly by boost.
 inline std::size_t hash_value(const TNetworkAddress& host_port) {
@@ -37,7 +98,7 @@ inline std::size_t hash_value(const TNetworkAddress& host_port) {
   return HashUtil::Hash(&host_port.port, sizeof(host_port.port), hash);
 }
 
-}
+} // end namespace impala
 
 /// Hash function for std:: containers
 namespace std {
@@ -48,7 +109,7 @@ template<> struct hash<impala::TNetworkAddress> {
   }
 };
 
-}
+} // end namespace std
 
 namespace impala {
 
@@ -139,6 +200,6 @@ void MergeMapValues(const MAP_TYPE& src, MAP_TYPE* dst) {
   }
 }
 
-}
+} // end namespace impala
 
 #endif
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index c01273d..e4b4234 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -157,17 +157,6 @@ TNetworkAddress MakeNetworkAddress(const string& address) {
   return ret;
 }
 
-/// Utility method because Thrift does not supply useful constructors
-TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& ip,
-    int port) {
-  TBackendDescriptor be_desc;
-  be_desc.address = MakeNetworkAddress(hostname, port);
-  be_desc.ip_address = ip;
-  be_desc.is_coordinator = true;
-  be_desc.is_executor = true;
-  return be_desc;
-}
-
 bool IsWildcardAddress(const string& ipaddress) {
   return ipaddress == "0.0.0.0";
 }
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index ff21d9d..4fe9e04 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -58,10 +58,6 @@ TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port);
 /// hostname and a port of 0.
 TNetworkAddress MakeNetworkAddress(const std::string& address);
 
-/// Utility method because Thrift does not supply useful constructors
-TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& ip,
-    int port);
-
 /// Returns true if the ip address parameter is the wildcard interface (0.0.0.0)
 bool IsWildcardAddress(const std::string& ipaddress);
 
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 795ef29..18e77ab 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -23,6 +23,7 @@
 
 #include "common/object-pool.h"
 #include "testutil/gtest-util.h"
+#include "util/container-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/thread.h"
diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc
index 4015bd5..0be9f7f 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/util/uid-util-test.cc
@@ -20,6 +20,7 @@
 #include <iostream>
 
 #include "testutil/gtest-util.h"
+#include "util/container-util.h"
 #include "util/uid-util.h"
 
 namespace impala {
diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt
index f297292..958cfac 100644
--- a/common/thrift/CMakeLists.txt
+++ b/common/thrift/CMakeLists.txt
@@ -58,8 +58,8 @@ function(THRIFT_GEN VAR)
     # It depends on hive_meta_store, which in turn depends on fb303.
     # The java dependency is handled by maven.
     # We need to generate C++ src file for the parent dependencies using the "-r" option.
-    set(CPP_ARGS ${THRIFT_INCLUDE_DIR_OPTION} --gen cpp:moveable_types -o
-        ${BE_OUTPUT_DIR})
+    set(CPP_ARGS ${THRIFT_INCLUDE_DIR_OPTION}
+        --gen cpp:moveable_types,no_default_operators -o ${BE_OUTPUT_DIR})
     IF (THRIFT_FILE STREQUAL "beeswax.thrift")
       set(CPP_ARGS -r ${CPP_ARGS})
     ENDIF(THRIFT_FILE STREQUAL "beeswax.thrift")
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index fe26ac6..f5ba40c 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -625,7 +625,7 @@ struct TExecRequest {
 
   // List of catalog objects accessed by this request. May be empty in this
   // case that the query did not access any Catalog objects.
-  8: optional set<TAccessEvent> access_events
+  8: optional list<TAccessEvent> access_events
 
   // List of warnings that were generated during analysis. May be empty.
   9: required list<string> analysis_warnings
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 8d7c1da..203518e 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -45,7 +45,7 @@ struct TPoolStats {
 }
 
 // Structure serialised in the Impala backend topic. Each Impalad
-// constructs one TBackendDescriptor, and registers it in the backend
+// constructs one TBackendDescriptor, and registers it in the cluster-membership
 // topic. Impalads subscribe to this topic to learn of the location of
 // all other Impalads in the cluster. Impalads can act as coordinators, executors or
 // both.
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 5fac28e..c75175a 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1055,6 +1055,17 @@
     "key": "statestore-subscriber.connected"
   },
   {
+    "description": "Number of times that the Daemon detected a loss of connectivity to the StateStore.",
+    "contexts": [
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "StateStore Connectivity Loss Count.",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "statestore-subscriber.num-connection-failures"
+  },
+  {
     "description": "The time (sec) between Statestore heartbeats.",
     "contexts": [
       "CATALOGSERVER",
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 0c1ff46..7b19eed 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -100,6 +100,7 @@ import org.apache.impala.planner.HdfsScanNode;
 import org.apache.impala.planner.PlanFragment;
 import org.apache.impala.planner.Planner;
 import org.apache.impala.planner.ScanNode;
+import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TCatalogOpRequest;
 import org.apache.impala.thrift.TCatalogOpType;
@@ -1367,7 +1368,7 @@ public class Frontend {
       TQueryCtx queryCtx, AnalysisResult analysisResult) {
     TExecRequest result = new TExecRequest();
     result.setQuery_options(queryCtx.client_request.getQuery_options());
-    result.setAccess_events(analysisResult.getAccessEvents());
+    result.setAccess_events(Lists.newArrayList(analysisResult.getAccessEvents()));
     result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
     result.setUser_has_profile_access(analysisResult.userHasProfileAccess());
     return result;
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 703f779..3729749 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -147,12 +147,12 @@ class TestCoordinators(CustomClusterTestSuite):
     tgt_dir = get_fs_path('/test-warehouse/{0}.db'.format(db_name))
     tgt_path = tgt_dir + "/tmp.jar"
 
-    try:
-      # copy jar with TestUpdateUdf (old) to tmp.jar
-      check_call(["hadoop", "fs", "-mkdir", "-p", tgt_dir])
-      check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path])
+    # copy jar with TestUpdateUdf (old) to tmp.jar
+    check_call(["hadoop", "fs", "-mkdir", "-p", tgt_dir])
+    check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path])
 
-      coordinator = self.cluster.impalads[0]
+    coordinator = self.cluster.impalads[0]
+    try:
       client = coordinator.service.create_beeswax_client()
 
       # create the database
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index ad85671..62fbf93 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -96,9 +96,9 @@ class TestRestart(CustomClusterTestSuite):
   @SkipIfNotHdfsMinicluster.scheduling
   @CustomClusterTestSuite.with_args(
     impalad_args="--statestore_subscriber_timeout_seconds={timeout_s} "
-                 "--failed_backends_query_cancellation_grace_period_ms={grace_period_ms}"
+                 "--statestore_subscriber_recovery_grace_period_ms={recovery_period_ms}"
     .format(timeout_s=SUBSCRIBER_TIMEOUT_S,
-            grace_period_ms=(CANCELLATION_GRACE_PERIOD_S * 1000)),
+            recovery_period_ms=(CANCELLATION_GRACE_PERIOD_S * 1000)),
     catalogd_args="--statestore_subscriber_timeout_seconds={timeout_s}".format(
       timeout_s=SUBSCRIBER_TIMEOUT_S))
   def test_restart_statestore_query_resilience(self):