You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/16 23:04:22 UTC

[4/4] impala git commit: IMPALA-5058: Improve the concurrency of DDL/DML operations

IMPALA-5058: Improve the concurrency of DDL/DML operations

Problem: A long running table metadata operation (e.g. refresh) could
prevent any other metadata operation from making progress if it
coincided with the catalog topic creation operations. The problem was due
to the conservative locking scheme used when catalog topics were
created. In particular, in order to collect a consistent snapshot of
metadata changes, the global catalog lock was held for the entire
duration of that operation.

Solution: To improve the concurrency of catalog operations the following
changes are performed:
* A range of catalog versions determines the catalog changes to be
  included in a catalog update. Any catalog changes that do not fall in
  the specified range are ignored (to be processed in subsequent catalog
  topic updates).
* The catalog allows metadata operations to make progress while
  collecting catalog updates.
* To prevent starvation of catalog updates (i.e. frequently updated
  catalog objects skipping catalog updates indefinitely), we keep track
  of the number of times a catalog object has skipped an update and if
  that number exceeds a threshold it is included in the next catalog
  topic update even if its version is not in the specified topic update
  version range. Hence, the same catalog object may be sent in two
  consecutive catalog topic updates.

This commit also changes the way deletions are handled in the catalog and
disseminated to the impalad nodes through the statestore. In particular:
* Deletions in the catalog are explicitly recorded in a log with
the catalog version in which they occurred. As before, added and deleted
catalog objects are sent to the statestore.
* Topic entries associated with deleted catalog objects have non-empty
values (besided keys) that contain minimal object metadata including the
catalog version.
* Statestore is no longer using the existence or not of
topic entry values in order to identify deleted topic entries. Deleted
topic entries should be explicitly marked as such by the statestore
subscribers that produce them.
* Statestore subscribers now use the 'deleted' flag to determine if a
topic entry corresponds to a deleted item.
* Impalads use the deleted objects' catalog versions when updating the
local catalog cache from a catalog update and not the update's maximum
catalog version.

Testing:
- No new tests were added as these paths are already exercised by
existing tests.
- Run all core and exhaustive tests.

Change-Id: If12467a83acaeca6a127491d89291dedba91a35a
Reviewed-on: http://gerrit.cloudera.org:8080/7731
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/8752


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3fc42ded
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3fc42ded
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3fc42ded

Branch: refs/heads/master
Commit: 3fc42ded02bd9314cbc1881cc52692cde35e02fb
Parents: 888a16c
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Mon Aug 14 11:05:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 16 23:01:32 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  57 +-
 be/src/catalog/catalog-server.h                 |  39 +-
 be/src/catalog/catalog-util.cc                  |  15 -
 be/src/catalog/catalog-util.h                   |   9 -
 be/src/catalog/catalog.cc                       |  19 +-
 be/src/catalog/catalog.h                        |  14 +-
 be/src/exec/catalog-op-executor.cc              |   6 +-
 be/src/exec/catalog-op-executor.h               |   4 +-
 be/src/scheduling/admission-controller.cc       |  18 +-
 be/src/scheduling/admission-controller.h        |   7 +-
 be/src/scheduling/scheduler-test-util.cc        |   5 +-
 be/src/scheduling/scheduler.cc                  |  22 +-
 be/src/service/client-request-state.cc          |   6 +-
 be/src/service/frontend.cc                      |   1 +
 be/src/service/impala-server.cc                 | 245 +++--
 be/src/service/impala-server.h                  |  55 +-
 be/src/statestore/statestore.cc                 |  56 +-
 be/src/statestore/statestore.h                  |  43 +-
 common/thrift/CatalogInternalService.thrift     |  24 +-
 common/thrift/CatalogService.thrift             |  74 +-
 common/thrift/Frontend.thrift                   |  38 +-
 common/thrift/StatestoreService.thrift          |  31 +-
 .../impala/catalog/AuthorizationPolicy.java     |   1 +
 .../java/org/apache/impala/catalog/Catalog.java |  57 +-
 .../apache/impala/catalog/CatalogDeltaLog.java  |  85 +-
 .../apache/impala/catalog/CatalogObject.java    |   5 +-
 .../impala/catalog/CatalogObjectCache.java      |  19 +-
 .../impala/catalog/CatalogObjectImpl.java       |  47 +
 .../catalog/CatalogObjectVersionQueue.java      |  73 ++
 .../impala/catalog/CatalogServiceCatalog.java   | 953 +++++++++++++++----
 .../org/apache/impala/catalog/DataSource.java   |  20 +-
 .../main/java/org/apache/impala/catalog/Db.java |  31 +-
 .../org/apache/impala/catalog/Function.java     |  19 +-
 .../apache/impala/catalog/HdfsCachePool.java    |  11 +-
 .../apache/impala/catalog/ImpaladCatalog.java   |  67 +-
 .../java/org/apache/impala/catalog/Role.java    |  19 +-
 .../apache/impala/catalog/RolePrivilege.java    |  20 +-
 .../java/org/apache/impala/catalog/Table.java   |  33 +-
 .../apache/impala/catalog/TopicUpdateLog.java   | 152 +++
 .../impala/service/CatalogOpExecutor.java       | 158 +--
 .../org/apache/impala/service/Frontend.java     |   6 +-
 .../org/apache/impala/service/JniCatalog.java   |  11 +-
 .../org/apache/impala/util/SentryProxy.java     |  31 +-
 .../testutil/CatalogServiceTestCatalog.java     |   2 +-
 .../impala/testutil/ImpaladTestCatalog.java     |   4 +-
 tests/statestore/test_statestore.py             |   8 +-
 46 files changed, 1768 insertions(+), 852 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 15685d0..b004b22 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -228,13 +228,10 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   const TTopicDelta& delta = topic->second;
 
-  // If this is not a delta update, clear all catalog objects and request an update
-  // from version 0 from the local catalog. There is an optimization that checks if
-  // pending_topic_updates_ was just reloaded from version 0, if they have then skip this
-  // step and use that data.
-  if (delta.from_version == 0 && delta.to_version == 0 &&
-      catalog_objects_min_version_ != 0) {
-    catalog_topic_entry_keys_.clear();
+  // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
+  // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
+  // to reload the full catalog.
+  if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
     last_sent_catalog_version_ = 0L;
   } else {
     // Process the pending topic update.
@@ -284,14 +281,17 @@ void CatalogServer::UpdateCatalogTopicCallback(
     } else if (current_catalog_version != last_sent_catalog_version_) {
       // If there has been a change since the last time the catalog was queried,
       // call into the Catalog to find out what has changed.
-      TGetAllCatalogObjectsResponse catalog_objects;
-      status = catalog_->GetAllCatalogObjects(last_sent_catalog_version_,
-          &catalog_objects);
+      TGetCatalogDeltaResponse catalog_objects;
+      status = catalog_->GetCatalogDelta(last_sent_catalog_version_, &catalog_objects);
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        // Use the catalog objects to build a topic update list.
-        BuildTopicUpdates(catalog_objects.objects);
+        // Use the catalog objects to build a topic update list. These include
+        // objects added to the catalog, 'updated_objects', and objects deleted
+        // from the catalog, 'deleted_objects'. The order in which we process
+        // these two disjoint sets of catalog objects does not matter.
+        BuildTopicUpdates(catalog_objects.updated_objects, false);
+        BuildTopicUpdates(catalog_objects.deleted_objects, true);
         catalog_objects_min_version_ = last_sent_catalog_version_;
         catalog_objects_max_version_ = catalog_objects.max_catalog_version;
       }
@@ -302,31 +302,19 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
-void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) {
-  unordered_set<string> current_entry_keys;
-  // Add any new/updated catalog objects to the topic.
+void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects,
+    bool topic_deletions) {
   for (const TCatalogObject& catalog_object: catalog_objects) {
+    DCHECK_GT(catalog_object.catalog_version, last_sent_catalog_version_);
     const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
     if (entry_key.empty()) {
       LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
                                << ThriftDebugString(catalog_object);
     }
-
-    current_entry_keys.insert(entry_key);
-    // Remove this entry from catalog_topic_entry_keys_. At the end of this loop, we will
-    // be left with the set of keys that were in the last update, but not in this
-    // update, indicating which objects have been removed/dropped.
-    catalog_topic_entry_keys_.erase(entry_key);
-
-    // This isn't a new or an updated item, skip it.
-    if (catalog_object.catalog_version <= last_sent_catalog_version_) continue;
-
-    VLOG(1) << "Publishing update: " << entry_key << "@"
-            << catalog_object.catalog_version;
-
     pending_topic_updates_.push_back(TTopicItem());
     TTopicItem& item = pending_topic_updates_.back();
     item.key = entry_key;
+    item.deleted = topic_deletions;
     Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
     if (!status.ok()) {
       LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
@@ -340,18 +328,9 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
         pending_topic_updates_.pop_back();
       }
     }
+    VLOG(1) << "Publishing " << (topic_deletions ? "deletion " : "update ")
+        << ": " << entry_key << "@" << catalog_object.catalog_version;
   }
-
-  // Any remaining items in catalog_topic_entry_keys_ indicate the object was removed
-  // since the last update.
-  for (const string& key: catalog_topic_entry_keys_) {
-    pending_topic_updates_.push_back(TTopicItem());
-    TTopicItem& item = pending_topic_updates_.back();
-    item.key = key;
-    VLOG(1) << "Publishing deletion: " << key;
-    // Don't set a value to mark this item as deleted.
-  }
-  catalog_topic_entry_keys_.swap(current_entry_keys);
 }
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index bf88e00..78a3f20 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -88,12 +88,6 @@ class CatalogServer {
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
-  /// Tracks the set of catalog objects that exist via their topic entry key.
-  /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects
-  /// that exist by their topic entry key. Used to track objects that have been removed
-  /// since the last heartbeat.
-  boost::unordered_set<std::string> catalog_topic_entry_keys_;
-
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -135,14 +129,10 @@ class CatalogServer {
   /// finds all catalog objects that have a catalog version greater than the last update
   /// sent by calling into the JniCatalog. The topic is updated with any catalog objects
   /// that are new or have been modified since the last heartbeat (by comparing the
-  /// catalog version of the object with last_sent_catalog_version_). Also determines any
-  /// deletions of catalog objects by looking at the
-  /// difference of the last set of topic entry keys that were sent and the current set
-  /// of topic entry keys. At the end of execution it notifies the
-  /// catalog_update_gathering_thread_ to fetch the next set of updates from the
-  /// JniCatalog.
-  /// All updates are added to the subscriber_topic_updates list and sent back to the
-  /// Statestore.
+  /// catalog version of the object with last_sent_catalog_version_). At the end of
+  /// execution it notifies the catalog_update_gathering_thread_ to fetch the next set of
+  /// updates from the JniCatalog. All updates are added to the subscriber_topic_updates
+  /// list and sent back to the Statestore.
   void UpdateCatalogTopicCallback(
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -153,20 +143,19 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
-  /// This function determines what items have been added/removed from the catalog
-  /// since the last heartbeat and builds the next topic update to send. To do this, it
-  /// enumerates the given catalog objects returned looking for the objects that have a
-  /// catalog version that is > the catalog version sent with the last heartbeat. To
-  /// determine items that have been deleted, it saves the set of topic entry keys sent
-  /// with the last update and looks at the difference between it and the current set of
-  /// topic entry keys.
+  /// Builds the next topic update to send based on what items
+  /// have been added/changed/removed from the catalog since the last hearbeat. To do
+  /// this, it enumerates the given catalog objects returned looking for the objects that
+  /// have a catalog version that is > the catalog version sent with the last heartbeat.
+  /// 'topic_deletions' is true if 'catalog_objects' contain deleted catalog
+  /// objects.
+  ///
   /// The key for each entry is a string composed of:
   /// "TCatalogObjectType:<unique object name>". So for table foo.bar, the key would be
   /// "TABLE:foo.bar". Encoding the object type information in the key ensures the keys
-  /// are unique, as well as helps to determine what object type was removed in a state
-  /// store delta update (since the state store only sends key names for deleted items).
-  /// Must hold catalog_lock_ when calling this function.
-  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects);
+  /// are unique. Must hold catalog_lock_ when calling this function.
+  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects,
+      bool topic_deletions);
 
   /// Example output:
   /// "databases": [

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index e968742..789723e 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -55,21 +55,6 @@ TCatalogObjectType::type TCatalogObjectTypeFromName(const string& name) {
   return TCatalogObjectType::UNKNOWN;
 }
 
-Status TCatalogObjectFromEntryKey(const string& key,
-    TCatalogObject* catalog_object) {
-  // Reconstruct the object based only on the key.
-  size_t pos = key.find(":");
-  if (pos == string::npos || pos >= key.size() - 1) {
-    stringstream error_msg;
-    error_msg << "Invalid topic entry key format: " << key;
-    return Status(error_msg.str());
-  }
-
-  TCatalogObjectType::type object_type = TCatalogObjectTypeFromName(key.substr(0, pos));
-  string object_name = key.substr(pos + 1);
-  return TCatalogObjectFromObjectName(object_type, object_name, catalog_object);
-}
-
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
     const string& object_name, TCatalogObject* catalog_object) {
   switch (object_type) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index ddc2c21..e98cd38 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -32,15 +32,6 @@ class Status;
 /// TCatalogObjectType::UNKNOWN if no match was found.
 TCatalogObjectType::type TCatalogObjectTypeFromName(const std::string& name);
 
-/// Parses the given IMPALA_CATALOG_TOPIC topic entry key to determine the
-/// TCatalogObjectType and unique object name. Populates catalog_object with the result.
-/// This is used to reconstruct type information when an item is deleted from the
-/// topic. At that time the only context available about the object being deleted is its
-/// its topic entry key which contains only the type and object name. The resulting
-/// TCatalogObject can then be used to removing a matching item from the catalog.
-Status TCatalogObjectFromEntryKey(const std::string& key,
-    TCatalogObject* catalog_object);
-
 /// Populates a TCatalogObject based on the given object type (TABLE, DATABASE, etc) and
 /// object name string.
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index e7e05da..b6dd86a 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -62,7 +62,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
-    {"getCatalogObjects", "(J)[B", &get_catalog_objects_id_},
+    {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -97,19 +97,10 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
-Status Catalog::GetAllCatalogObjects(long from_version,
-    TGetAllCatalogObjectsResponse* resp) {
-  JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jvalue requested_from_version;
-  requested_from_version.j = from_version;
-  jbyteArray result_bytes = static_cast<jbyteArray>(
-      jni_env->CallObjectMethod(catalog_, get_catalog_objects_id_,
-      requested_from_version));
-  RETURN_ERROR_IF_EXC(jni_env);
-  RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, resp));
-  return Status::OK();
+Status Catalog::GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp) {
+  TGetCatalogDeltaRequest request;
+  request.__set_from_version(from_version);
+  return JniUtil::CallJniMethod(catalog_, get_catalog_delta_id_, request, resp);
 }
 
 Status Catalog::ExecDdl(const TDdlExecRequest& req, TDdlExecResponse* resp) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index ab6a2a3..3119d60 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -56,12 +56,10 @@ class Catalog {
   /// Status object with information on the error will be returned.
   Status GetCatalogVersion(long* version);
 
-  /// Gets all Catalog objects and the metadata that is applicable for the given request.
-  /// Always returns all object names that exist in the Catalog, but allows for extended
-  /// metadata for objects that were modified after the specified version.
-  /// Returns OK if the operation was successful, otherwise a Status object with
-  /// information on the error will be returned.
-  Status GetAllCatalogObjects(long from_version, TGetAllCatalogObjectsResponse* resp);
+  /// Retrieves the catalog objects that were added/modified/deleted since version
+  /// 'from_version'. Returns OK if the operation was successful, otherwise a Status
+  /// object with information on the error will be returned.
+  Status GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp);
 
   /// Gets the Thrift representation of a Catalog object. The request is a TCatalogObject
   /// which has the desired TCatalogObjectType and name properly set.
@@ -74,7 +72,7 @@ class Catalog {
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
   /// and each pN may contain wildcards denoted by '*' which match all strings.
   /// TODO: GetDbs() and GetTableNames() can probably be scrapped in favor of
-  /// GetAllCatalogObjects(). Consider removing them and moving everything to use
+  /// GetCatalogDelta(). Consider removing them and moving everything to use
   /// that.
   Status GetDbs(const std::string* pattern, TGetDbsResult* dbs);
 
@@ -109,7 +107,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
-  jmethodID get_catalog_objects_id_;  // JniCatalog.getCatalogObjects()
+  jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 7490ed1..12398cf 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -89,16 +89,20 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
 }
 
 Status CatalogOpExecutor::ExecComputeStats(
-    const TComputeStatsParams& compute_stats_params,
+    const TCatalogOpRequest& compute_stats_request,
     const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
     const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
   // Create a new DDL request to alter the table's statistics.
   TCatalogOpRequest catalog_op_req;
   catalog_op_req.__isset.ddl_params = true;
   catalog_op_req.__set_op_type(TCatalogOpType::DDL);
+  catalog_op_req.__set_sync_ddl(compute_stats_request.sync_ddl);
   TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
   update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
+  update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
 
+  const TComputeStatsParams& compute_stats_params =
+      compute_stats_request.ddl_params.compute_stats_params;
   TAlterTableUpdateStatsParams& update_stats_params =
       update_stats_req.alter_table_params.update_stats_params;
   update_stats_req.__isset.alter_table_params = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/exec/catalog-op-executor.h
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h
index 33b949e..375c839 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -47,10 +47,10 @@ class CatalogOpExecutor {
   /// be loaded.
   Status GetCatalogObject(const TCatalogObject& object_desc, TCatalogObject* result);
 
-  /// Translates the given compute stats params and its child-query results into
+  /// Translates the given compute stats request and its child-query results into
   /// a new table alteration request for updating the stats metadata, and executes
   /// the alteration via Exec();
-  Status ExecComputeStats(const TComputeStatsParams& compute_stats_params,
+  Status ExecComputeStats(const TCatalogOpRequest& compute_stats_request,
       const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema,
       const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data,
       const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ed4e7e2..99f659a 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -645,7 +645,6 @@ void AdmissionController::UpdatePoolStats(
         }
       }
       HandleTopicUpdates(delta.topic_entries);
-      HandleTopicDeletions(delta.topic_deletions);
     }
     UpdateClusterAggregates();
   }
@@ -683,6 +682,10 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
     // The topic entry from this subscriber is handled specially; the stats coming
     // from the statestore are likely already outdated.
     if (topic_backend_id == host_id_) continue;
+    if (item.deleted) {
+      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+      continue;
+    }
     TPoolStats remote_update;
     uint32_t len = item.value.size();
     Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -695,18 +698,7 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
   }
 }
 
-void AdmissionController::HandleTopicDeletions(const vector<string>& topic_deletions) {
-  for (const string& topic_key: topic_deletions) {
-    string pool_name;
-    string topic_backend_id;
-    if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
-    if (topic_backend_id == host_id_) continue;
-    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-  }
-}
-
-void AdmissionController::PoolStats::UpdateAggregates(
-    HostMemMap* host_mem_reserved) {
+void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) {
   const string& coord_id = parent_->host_id_;
   int64_t num_running = 0;
   int64_t num_queued = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 71c9fa4..2830bee 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -428,13 +428,10 @@ class AdmissionController {
   void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
-  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  /// Removes remote stats identified by topic deletions coming from the
+  /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
   void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
 
-  /// Removes remote stats identified by the topic_deletions coming from the statestore.
-  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
-  void HandleTopicDeletions(const std::vector<std::string>& topic_deletions);
-
   /// Re-computes the per-pool aggregate stats and the per-host aggregates in
   /// host_mem_reserved_ using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 6fb2bba..05cfc42 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -478,7 +478,10 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
   TTopicDelta delta;
   delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
-  delta.topic_deletions.push_back(host.ip);
+  TTopicItem item;
+  item.__set_deleted(true);
+  item.__set_key(host.ip);
+  delta.topic_entries.push_back(item);
   SendTopicDelta(delta);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 2bd6c96..5cf0f01 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -131,9 +131,7 @@ void Scheduler::UpdateMembership(
 
   // If the delta transmitted by the statestore is empty we can skip processing
   // altogether and avoid making a copy of executors_config_.
-  if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) {
-    return;
-  }
+  if (delta.is_delta && delta.topic_entries.empty()) return;
 
   // This function needs to handle both delta and non-delta updates. To minimize the
   // time needed to hold locks, all updates are applied to a copy of
@@ -150,10 +148,17 @@ void Scheduler::UpdateMembership(
     new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new entries to the topic. Update executors_config_ and
+  // Process new and removed entries to the topic. Update executors_config_ and
   // current_executors_ to match the set of executors given by the
   // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
+    if (item.deleted) {
+      if (current_executors_.find(item.key) != current_executors_.end()) {
+        new_executors_config->RemoveBackend(current_executors_[item.key]);
+        current_executors_.erase(item.key);
+      }
+      continue;
+    }
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
     // ~10m messages per second, so no immediate need to consider optimization.
@@ -188,15 +193,6 @@ void Scheduler::UpdateMembership(
       current_executors_.insert(make_pair(item.key, be_desc));
     }
   }
-
-  // Process deletions from the topic
-  for (const string& backend_id : delta.topic_deletions) {
-    if (current_executors_.find(backend_id) != current_executors_.end()) {
-      new_executors_config->RemoveBackend(current_executors_[backend_id]);
-      current_executors_.erase(backend_id);
-    }
-  }
-
   SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 87ac8ff..ee35afa 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -183,12 +183,15 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
 
       // Now refresh the table metadata.
       TCatalogOpRequest reset_req;
+      reset_req.__set_sync_ddl(exec_request_.query_options.sync_ddl);
       reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
       reset_req.__set_reset_metadata_params(TResetMetadataRequest());
       reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
       reset_req.reset_metadata_params.__set_is_refresh(true);
       reset_req.reset_metadata_params.__set_table_name(
           exec_request_.load_data_request.table_name);
+      reset_req.reset_metadata_params.__set_sync_ddl(
+          exec_request_.query_options.sync_ddl);
       catalog_op_executor_.reset(
           new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
       RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
@@ -923,6 +926,7 @@ Status ClientRequestState::UpdateCatalog() {
   if (query_exec_request.__isset.finalize_params) {
     const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
     TUpdateCatalogRequest catalog_update;
+    catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
     if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
@@ -1078,7 +1082,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
 
   Status status = catalog_op_executor_->ExecComputeStats(
-      exec_request_.catalog_op_request.ddl_params.compute_stats_params,
+      exec_request_.catalog_op_request,
       child_queries[0]->result_schema(),
       child_queries[0]->result_data(),
       col_stats_schema,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index df39114..50883af 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -17,6 +17,7 @@
 
 #include "service/frontend.h"
 
+#include <jni.h>
 #include <list>
 #include <string>
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 5b895ee..6358145 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1327,10 +1327,10 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
-
   // Update catalog cache in frontend. An update is split into batches of size
   // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
-  if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
+  if (delta.topic_entries.size() != 0)  {
+    vector<TCatalogObject> dropped_objects;
     vector<TUpdateCatalogCacheRequest> update_reqs;
     update_reqs.push_back(TUpdateCatalogCacheRequest());
     TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
@@ -1340,7 +1340,6 @@ void ImpalaServer::CatalogUpdateCallback(
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
     uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
-      TCatalogObject catalog_object;
       Status status;
       vector<uint8_t> data_buffer;
       const uint8_t* data_buffer_ptr = nullptr;
@@ -1358,82 +1357,70 @@ void ImpalaServer::CatalogUpdateCallback(
         data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
         len = item.value.size();
       }
+      if (len > 100 * 1024 * 1024 /* 100MB */) {
+        LOG(INFO) << "Received large catalog object(>100mb): "
+            << item.key << " is "
+            << PrettyPrinter::Print(len, TUnit::BYTES);
+      }
+      TCatalogObject catalog_object;
       status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic,
           &catalog_object);
       if (!status.ok()) {
         LOG(ERROR) << "Error deserializing item " << item.key
-                   << ": " << status.GetDetail();
+            << ": " << status.GetDetail();
         continue;
       }
-      if (len > 100 * 1024 * 1024 /* 100MB */) {
-        LOG(INFO) << "Received large catalog update(>100mb): "
-                     << item.key << " is "
-                     << PrettyPrinter::Print(len, TUnit::BYTES);
-      }
-      if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        incremental_request->__set_catalog_service_id(
-            catalog_object.catalog.catalog_service_id);
-        new_catalog_version = catalog_object.catalog_version;
-      }
-
-      // Refresh the lib cache entries of any added functions and data sources
-      // TODO: if frontend returns the list of functions and data sources, we do not
-      // need to deserialize these in backend.
-      if (catalog_object.type == TCatalogObjectType::FUNCTION) {
-        DCHECK(catalog_object.__isset.fn);
-        LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
-      }
-      if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-        DCHECK(catalog_object.__isset.data_source);
-        LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
-      }
 
       if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
+        // Initialize a new batch of catalog updates.
         update_reqs.push_back(TUpdateCatalogCacheRequest());
         incremental_request = &update_reqs.back();
         batch_size_bytes = 0;
       }
-      incremental_request->updated_objects.push_back(catalog_object);
-      batch_size_bytes += len;
-    }
-    update_reqs.push_back(TUpdateCatalogCacheRequest());
-    TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back();
-
-    // We need to look up the dropped functions and data sources and remove them
-    // from the library cache. The data sent from the catalog service does not
-    // contain all the function metadata so we'll ask our local frontend for it. We
-    // need to do this before updating the catalog.
-    vector<TCatalogObject> dropped_objects;
 
-    // Process all Catalog deletions (dropped objects). We only know the keys (object
-    // names) so must parse each key to determine the TCatalogObject.
-    for (const string& key: delta.topic_deletions) {
-      LOG(INFO) << "Catalog topic entry deletion: " << key;
-      TCatalogObject catalog_object;
-      Status status = TCatalogObjectFromEntryKey(key, &catalog_object);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error parsing catalog topic entry deletion key: " << key << " "
-                   << "Error: " << status.GetDetail();
-        continue;
+      if (catalog_object.type == TCatalogObjectType::CATALOG) {
+        incremental_request->__set_catalog_service_id(
+            catalog_object.catalog.catalog_service_id);
+        new_catalog_version = catalog_object.catalog_version;
       }
-      deletion_request->removed_objects.push_back(catalog_object);
-      if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-          catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-        TCatalogObject dropped_object;
-        if (exec_env_->frontend()->GetCatalogObject(
-                catalog_object, &dropped_object).ok()) {
-          // This object may have been dropped and re-created. To avoid removing the
-          // re-created object's entry from the cache verify the existing object has a
-          // catalog version <= the catalog version included in this statestore heartbeat.
-          if (dropped_object.catalog_version <= new_catalog_version) {
-            if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-                catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-              dropped_objects.push_back(dropped_object);
+      VLOG(3) << (item.deleted ? "Deleted " : "Added ") << "item: " << item.key
+          << " version: " << catalog_object.catalog_version << " of size: " << len;
+
+      if (!item.deleted) {
+        // Refresh the lib cache entries of any added functions and data sources
+        // TODO: if frontend returns the list of functions and data sources, we do not
+        // need to deserialize these in backend.
+        if (catalog_object.type == TCatalogObjectType::FUNCTION) {
+          DCHECK(catalog_object.__isset.fn);
+          LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
+        }
+        if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+          DCHECK(catalog_object.__isset.data_source);
+          LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
+        }
+        incremental_request->updated_objects.push_back(catalog_object);
+      } else {
+        // We need to look up any dropped functions and data sources and remove
+        // them from the library cache.
+        if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+            catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+          TCatalogObject existing_object;
+          if (exec_env_->frontend()->GetCatalogObject(
+              catalog_object, &existing_object).ok()) {
+            // If the object exists in the catalog it may have been dropped and
+            // re-created. To avoid removing the re-created object's entry from
+            // the cache verify that the existing object's version <= the
+            // version of the dropped object included in this statestore
+            // heartbeat.
+            DCHECK_NE(existing_object.catalog_version, catalog_object.catalog_version);
+            if (existing_object.catalog_version < catalog_object.catalog_version) {
+              dropped_objects.push_back(existing_object);
             }
           }
         }
-        // Nothing to do in error case.
+        incremental_request->removed_objects.push_back(catalog_object);
       }
+      batch_size_bytes += len;
     }
 
     // Call the FE to apply the changes to the Impalad Catalog.
@@ -1456,6 +1443,9 @@ void ImpalaServer::CatalogUpdateCallback(
         catalog_update_info_.catalog_version = new_catalog_version;
         catalog_update_info_.catalog_topic_version = delta.to_version;
         catalog_update_info_.catalog_service_id = resp.catalog_service_id;
+        catalog_update_info_.min_catalog_object_version = resp.min_catalog_object_version;
+        LOG(INFO) << "Catalog topic update applied with version: " << new_catalog_version
+            << " new min catalog object version: " << resp.min_catalog_object_version;
       }
       ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
       // TODO: deal with an error status
@@ -1482,13 +1472,89 @@ void ImpalaServer::CatalogUpdateCallback(
   catalog_version_update_cv_.NotifyAll();
 }
 
+void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
+    const TUniqueId& catalog_service_id) {
+  unique_lock<mutex> unique_lock(catalog_version_lock_);
+  // Wait for the update to be processed locally.
+  VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
+             << " current version: " << catalog_update_info_.catalog_version;
+  while (catalog_update_info_.catalog_version < catalog_update_version &&
+         catalog_update_info_.catalog_service_id == catalog_service_id) {
+    catalog_version_update_cv_.Wait(unique_lock);
+  }
+
+  if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    VLOG_QUERY << "Detected change in catalog service ID";
+  } else {
+    VLOG_QUERY << "Received catalog version: " << catalog_update_version;
+  }
+}
+
+void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
+    const TUniqueId& catalog_service_id) {
+  unique_lock<mutex> unique_lock(catalog_version_lock_);
+  int64_t min_req_subscriber_topic_version =
+      catalog_update_info_.catalog_topic_version;
+  VLOG_QUERY << "Waiting for min subscriber topic version: "
+      << min_req_subscriber_topic_version << " current version: "
+      << min_subscriber_catalog_topic_version_;
+  while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
+         catalog_update_info_.catalog_service_id == catalog_service_id) {
+    catalog_version_update_cv_.Wait(unique_lock);
+  }
+
+  if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    VLOG_QUERY << "Detected change in catalog service ID";
+  } else {
+    VLOG_QUERY << "Received min subscriber topic version: "
+        << min_req_subscriber_topic_version;
+  }
+}
+
+void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_version,
+    const TUniqueId& catalog_service_id) {
+  unique_lock<mutex> unique_lock(catalog_version_lock_);
+  int64_t min_catalog_object_version =
+      catalog_update_info_.min_catalog_object_version;
+  // TODO: Set a timeout to eventually break out of this loop is something goes
+  // wrong?
+  VLOG_QUERY << "Waiting for minimum catalog object version: "
+      << min_req_catalog_object_version << " current version: "
+      << min_catalog_object_version;
+  while (catalog_update_info_.min_catalog_object_version < min_req_catalog_object_version
+      && catalog_update_info_.catalog_service_id == catalog_service_id) {
+    catalog_version_update_cv_.Wait(unique_lock);
+  }
+
+  if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    VLOG_QUERY << "Detected change in catalog service ID";
+  } else {
+    VLOG_QUERY << "Updated minimum catalog object version: "
+        << min_req_catalog_object_version;
+  }
+}
+
 Status ImpalaServer::ProcessCatalogUpdateResult(
     const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers) {
-  // If this update result contains catalog objects to add or remove, directly apply the
-  // updates to the local impalad's catalog cache. Otherwise, wait for a statestore
-  // heartbeat that contains this update version.
-  if (catalog_update_result.__isset.updated_catalog_objects ||
-      catalog_update_result.__isset.removed_catalog_objects) {
+  const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
+  if (!catalog_update_result.__isset.updated_catalog_objects &&
+      !catalog_update_result.__isset.removed_catalog_objects) {
+    // Operation with no result set. Use the version specified in
+    // 'catalog_update_result' to determine when the effects of this operation
+    // have been applied to the local catalog cache.
+    if (catalog_update_result.is_invalidate) {
+      WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    } else {
+      WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    }
+    if (wait_for_all_subscribers) {
+      // Now wait for this update to be propagated to all catalog topic subscribers.
+      // If we make it here it implies the first condition was met (the update was
+      // processed locally or the catalog service id has changed).
+      WaitForCatalogUpdateTopicPropagation(catalog_service_id);
+    }
+  } else {
+    // Operation with a result set.
     TUpdateCatalogCacheRequest update_req;
     update_req.__set_is_delta(true);
     update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
@@ -1498,7 +1564,6 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     if (catalog_update_result.__isset.removed_catalog_objects) {
       update_req.__set_removed_objects(catalog_update_result.removed_catalog_objects);
     }
-
     // Apply the changes to the local catalog cache.
     TUpdateCatalogCacheResponse resp;
     Status status = exec_env_->frontend()->UpdateCatalogCache(
@@ -1506,34 +1571,12 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
     RETURN_IF_ERROR(status);
     if (!wait_for_all_subscribers) return Status::OK();
-  }
-
-  unique_lock<mutex> unique_lock(catalog_version_lock_);
-  int64_t min_req_catalog_version = catalog_update_result.version;
-  const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
-
-  // Wait for the update to be processed locally.
-  // TODO: What about query cancellation?
-  VLOG_QUERY << "Waiting for catalog version: " << min_req_catalog_version
-             << " current version: " << catalog_update_info_.catalog_version;
-  while (catalog_update_info_.catalog_version < min_req_catalog_version &&
-         catalog_update_info_.catalog_service_id == catalog_service_id) {
-    catalog_version_update_cv_.Wait(unique_lock);
-  }
-
-  if (!wait_for_all_subscribers) return Status::OK();
-
-  // Now wait for this update to be propagated to all catalog topic subscribers.
-  // If we make it here it implies the first condition was met (the update was processed
-  // locally or the catalog service id has changed).
-  int64_t min_req_subscriber_topic_version = catalog_update_info_.catalog_topic_version;
-
-  VLOG_QUERY << "Waiting for min subscriber topic version: "
-             << min_req_subscriber_topic_version << " current version: "
-             << min_subscriber_catalog_topic_version_;
-  while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
-         catalog_update_info_.catalog_service_id == catalog_service_id) {
-    catalog_version_update_cv_.Wait(unique_lock);
+    // Wait until we receive and process the catalog update that covers the effects
+    // (catalog objects) of this operation.
+    WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    // Now wait for this update to be propagated to all catalog topic
+    // subscribers.
+    WaitForCatalogUpdateTopicPropagation(catalog_service_id);
   }
   return Status::OK();
 }
@@ -1554,6 +1597,10 @@ void ImpalaServer::MembershipCallback(
 
     // Process membership additions.
     for (const TTopicItem& item: delta.topic_entries) {
+      if (item.deleted) {
+        known_backends_.erase(item.key);
+        continue;
+      }
       uint32_t len = item.value.size();
       TBackendDescriptor backend_descriptor;
       Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -1570,18 +1617,12 @@ void ImpalaServer::MembershipCallback(
     // Only register if all ports have been opened and are ready.
     if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates);
 
-    // Process membership deletions.
-    for (const string& backend_id: delta.topic_deletions) {
-      known_backends_.erase(backend_id);
-    }
-
     // Create a set of known backend network addresses. Used to test for cluster
     // membership by network address.
     set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes is true.
     TUpdateMembershipRequest update_req;
-    bool any_changes = !delta.topic_entries.empty() || !delta.topic_deletions.empty() ||
-        !delta.is_delta;
+    bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
     for (const BackendDescriptorMap::value_type& backend: known_backends_) {
       current_membership.insert(backend.second.address);
       if (any_changes) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 868ae54..dad1ebf 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -308,22 +308,44 @@ class ImpalaServer : public ImpalaServiceIf,
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
-  /// Processes a CatalogUpdateResult returned from the CatalogServer and ensures
-  /// the update has been applied to the local impalad's catalog cache. If
-  /// wait_for_all_subscribers is true, this function will also wait until all
-  /// catalog topic subscribers have processed the update. Called from ClientRequestState
-  /// after executing any statement that modifies the catalog.
-  /// If wait_for_all_subscribers is false AND if the TCatalogUpdateResult contains
-  /// TCatalogObject(s) to add and/or remove, this function will update the local cache
-  /// by directly calling UpdateCatalog() with the TCatalogObject results.
-  /// Otherwise this function will wait until the local impalad's catalog cache has been
-  /// updated from a statestore heartbeat that includes this catalog update's catalog
-  /// version. If wait_for_all_subscribers is true, this function also wait all other
-  /// catalog topic subscribers to process this update by checking the current
-  /// min_subscriber_topic_version included in each state store heartbeat.
+  /// Processes a TCatalogUpdateResult returned from the CatalogServer and ensures
+  /// the update has been applied to the local impalad's catalog cache. Called from
+  /// ClientRequestState after executing any statement that modifies the catalog.
+  ///
+  /// If TCatalogUpdateResult contains TCatalogObject(s) to add and/or remove, this
+  /// function will update the local cache by directly calling UpdateCatalog() with the
+  /// TCatalogObject results.
+  ///
+  /// If TCatalogUpdateResult does not contain any TCatalogObjects and this is
+  /// the result of an INVALIDATE METADATA operation, it waits until the minimum
+  /// catalog version in the local cache is greater than or equal to the catalog
+  /// version specified in TCatalogUpdateResult. If it is not an INVALIDATE
+  /// METADATA operation, it waits until the local impalad's catalog cache has
+  /// been updated from a statestore heartbeat that includes this catalog
+  /// update's version.
+  ///
+  /// If wait_for_all_subscribers is true, this function also
+  /// waits for all other catalog topic subscribers to process this update by checking the
+  /// current min_subscriber_topic_version included in each state store heartbeat.
   Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
       bool wait_for_all_subscribers) WARN_UNUSED_RESULT;
 
+  /// Wait until the catalog update with version 'catalog_update_version' is
+  /// received and applied in the local catalog cache or until the catalog
+  /// service id has changed.
+  void WaitForCatalogUpdate(const int64_t catalog_update_version,
+      const TUniqueId& catalog_service_id);
+
+  /// Wait until the minimum catalog object version in the local cache is
+  /// greater than or equal to 'min_catalog_update_version' or until the catalog
+  /// service id has changed.
+  void WaitForMinCatalogUpdate(const int64_t min_catalog_update_version,
+      const TUniqueId& catalog_service_id);
+
+  /// Wait until the last applied catalog update has been broadcast to
+  /// all coordinators or until the catalog service id has changed.
+  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id);
+
   /// Returns true if lineage logging is enabled, false otherwise.
   bool IsLineageLoggingEnabled();
 
@@ -961,7 +983,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Lock to protect uuid_generator
   boost::mutex uuid_lock_;
 
-  /// Lock for catalog_update_version_info_, min_subscriber_catalog_topic_version_,
+  /// Lock for catalog_update_version_, min_subscriber_catalog_topic_version_,
   /// and catalog_version_update_cv_
   boost::mutex catalog_version_lock_;
 
@@ -972,7 +994,8 @@ class ImpalaServer : public ImpalaServiceIf,
   struct CatalogUpdateVersionInfo {
     CatalogUpdateVersionInfo() :
       catalog_version(0L),
-      catalog_topic_version(0L) {
+      catalog_topic_version(0L),
+      min_catalog_object_version(0L) {
     }
 
     /// The last catalog version returned from UpdateCatalog()
@@ -981,6 +1004,8 @@ class ImpalaServer : public ImpalaServiceIf,
     TUniqueId catalog_service_id;
     /// The statestore catalog topic version this update was received in.
     int64_t catalog_topic_version;
+    /// Minimum catalog object version after a call to UpdateCatalog()
+    int64_t min_catalog_object_version;
   };
 
   /// The version information from the last successfull call to UpdateCatalog().

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 2d93c5f..d0a4851 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -85,8 +85,6 @@ const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-by
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
-const Statestore::TopicEntry::Value Statestore::TopicEntry::NULL_VALUE = "";
-
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
@@ -124,13 +122,13 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
 void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
     TopicEntry::Version version) {
-  DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
+  DCHECK_GT(bytes.size(), 0);
   value_ = bytes;
   version_ = version;
 }
 
 Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes) {
+    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
   TopicEntryMap::iterator entry_it = entries_.find(key);
   int64_t key_size_delta = 0;
   int64_t value_size_delta = 0;
@@ -147,6 +145,7 @@ Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
   value_size_delta += bytes.size();
 
   entry_it->second.SetValue(bytes, ++last_version_);
+  entry_it->second.SetDeleted(is_deleted);
   topic_update_log_.insert(make_pair(entry_it->second.version(), key));
 
   total_key_size_bytes_ += key_size_delta;
@@ -168,12 +167,10 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     // entry
     topic_update_log_.erase(version);
     topic_update_log_.insert(make_pair(++last_version_, key));
-    total_value_size_bytes_ -= entry_it->second.value().size();
-    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
-
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
-    entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
+    entry_it->second.SetDeleted(true);
+    entry_it->second.SetVersion(last_version_);
   }
 }
 
@@ -467,11 +464,9 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // At this point the updates are assumed to have been successfully processed by the
   // subscriber. Update the subscriber's max version of each topic.
-  map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
-      update_state_request.topic_deltas.begin();
-  for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
-    subscriber->SetLastTopicVersionProcessed(topic_delta->first,
-        topic_delta->second.to_version);
+  for (const auto& topic_delta: update_state_request.topic_deltas) {
+    subscriber->SetLastTopicVersionProcessed(topic_delta.first,
+        topic_delta.second.to_version);
   }
 
   // Thirdly: perform any / all updates returned by the subscriber
@@ -500,14 +495,8 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
       Topic* topic = &topic_it->second;
       for (const TTopicItem& item: update.topic_entries) {
-        TopicEntry::Version version = topic->Put(item.key, item.value);
-        subscriber->AddTransientUpdate(update.topic_name, item.key, version);
-      }
-
-      for (const string& key: update.topic_deletions) {
-        TopicEntry::Version version =
-            topic->Put(key, Statestore::TopicEntry::NULL_VALUE);
-        subscriber->AddTransientUpdate(update.topic_name, key, version);
+        subscriber->AddTransientUpdate(update.topic_name, item.key,
+            topic->Put(item.key, item.value, item.deleted));
       }
     }
   }
@@ -541,30 +530,25 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
       TopicUpdateLog::const_iterator next_update =
           topic.topic_update_log().upper_bound(last_processed_version);
 
-      int64_t deleted_key_size_bytes = 0;
+      uint64_t topic_size = 0;
       for (; next_update != topic.topic_update_log().end(); ++next_update) {
         TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
         DCHECK(itr != topic.entries().end());
         const TopicEntry& topic_entry = itr->second;
-        if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
-          if (!topic_delta.is_delta) {
-            deleted_key_size_bytes += itr->first.size();
-            continue;
-          }
-          topic_delta.topic_deletions.push_back(itr->first);
-        } else {
-          topic_delta.topic_entries.push_back(TTopicItem());
-          TTopicItem& topic_item = topic_delta.topic_entries.back();
-          topic_item.key = itr->first;
-          // TODO: Does this do a needless copy?
-          topic_item.value = topic_entry.value();
+        // Don't send deleted entries for non-delta updates.
+        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
+          continue;
         }
+        topic_delta.topic_entries.push_back(TTopicItem());
+        TTopicItem& topic_item = topic_delta.topic_entries.back();
+        topic_item.key = itr->first;
+        topic_item.value = topic_entry.value();
+        topic_item.deleted = topic_entry.is_deleted();
+        topic_size += topic_item.key.size() + topic_item.value.size();
       }
 
       if (!topic_delta.is_delta &&
           topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes
-            + topic.total_value_size_bytes();
         VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
                    << " topic update for " << subscriber.id() << ". Size = "
                    << PrettyPrinter::Print(topic_size, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 1488f7e..38b8361 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -131,8 +131,7 @@ class Statestore : public CacheLineAligned {
 
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
-  /// pair. If the byte string is NULL, the entry has been deleted, but may be retained to
-  /// track changes to send to subscribers.
+  /// pair.
   class TopicEntry {
    public:
     /// A Value is a string of bytes, for which std::string is a convenient representation.
@@ -146,30 +145,38 @@ class Statestore : public CacheLineAligned {
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
 
-    /// Representation of an empty Value. Must have size() == 0.
-    static const Value NULL_VALUE;
-
-    /// Sets the value of this entry to the byte / length pair. NULL_VALUE implies this
-    /// entry has been deleted.  The caller is responsible for ensuring, if required, that
-    /// the version parameter is larger than the current version() TODO: Consider enforcing
-    /// version monotonicity here.
+    /// Sets the value of this entry to the byte / length pair. The caller is responsible
+    /// for ensuring, if required, that the version parameter is larger than the
+    /// current version() TODO: Consider enforcing version monotonicity here.
     void SetValue(const Value& bytes, Version version);
 
-    TopicEntry() : value_(NULL_VALUE), version_(TOPIC_ENTRY_INITIAL_VERSION) { }
+    /// Sets a new version for this entry.
+    void SetVersion(Version version) { version_ = version; }
+
+    /// Sets the is_deleted_ flag for this entry.
+    void SetDeleted(bool is_deleted) { is_deleted_ = is_deleted; }
+
+    TopicEntry() : version_(TOPIC_ENTRY_INITIAL_VERSION),
+        is_deleted_(false) { }
 
     const Value& value() const { return value_; }
     uint64_t version() const { return version_; }
     uint32_t length() const { return value_.size(); }
+    bool is_deleted() const { return is_deleted_; }
 
    private:
-    /// Byte string value, owned by this TopicEntry. The value is opaque to the statestore,
-    /// and is interpreted only by subscribers.
+    /// Byte string value, owned by this TopicEntry. The value is opaque to the
+    /// statestore, and is interpreted only by subscribers.
     Value value_;
 
     /// The version of this entry. Every update is assigned a monotonically increasing
     /// version number so that only the minimal set of changes can be sent from the
     /// statestore to a subscriber.
     Version version_;
+
+    /// Indicates if the entry has been deleted. If true, the entry will still be
+    /// retained to track changes to send to subscribers.
+    bool is_deleted_;
   };
 
   /// Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
@@ -192,19 +199,21 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key. If bytes == NULL_VALUE, the entry is considered
-    /// deleted, and may be garbage collected in the future. The entry is assigned a new
-    /// version number by the Topic, and that version number is returned.
+    /// Adds an entry with the given key and value (bytes). If is_deleted is
+    /// true the entry is considered deleted, and may be garbage collected in the future.
+    /// The entry is assigned a new version number by the Topic, and that version number
+    /// is returned.
     //
     /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes);
+    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
+        bool is_deleted);
 
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
     /// when that subscriber fails (the same entry may exist, but may have been updated by
     /// another subscriber giving it a new version number)
     //
-    /// Deletion means setting the entry's value to NULL and incrementing its version
+    /// Deletion means marking the entry as deleted and incrementing its version
     /// number.
     //
     /// Must be called holding the topic lock

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/CatalogInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogInternalService.thrift b/common/thrift/CatalogInternalService.thrift
index 5b68408..ffccd4b 100644
--- a/common/thrift/CatalogInternalService.thrift
+++ b/common/thrift/CatalogInternalService.thrift
@@ -22,15 +22,25 @@ include "CatalogObjects.thrift"
 
 // Contains structures used internally by the Catalog Server.
 
-// Response from a call to GetAllCatalogObjects. Contains all known Catalog objects
-// (databases, tables/views, and functions) from the CatalogService's cache.
-// What metadata is included for each object is based on the parameters used in
-// the request.
-struct TGetAllCatalogObjectsResponse {
+// Arguments to a GetCatalogDelta call.
+struct TGetCatalogDeltaRequest {
+  // The base catalog version from which the delta is computed.
+  1: required i64 from_version
+}
+
+// Response from a call to GetCatalogDelta. Contains a delta of catalog objects
+// (databases, tables/views, and functions) from the CatalogService's cache relative (>)
+// to the catalog version specified in TGetCatalogDelta.from_version.
+struct TGetCatalogDeltaResponse {
   // The maximum catalog version of all objects in this response or 0 if the Catalog
   // contained no objects.
   1: required i64 max_catalog_version
 
-  // List of catalog objects (empty list if no objects detected in the Catalog).
-  2: required list<CatalogObjects.TCatalogObject> objects
+  // List of updated (new and modified) catalog objects whose catalog verion is
+  // larger than TGetCatalotDeltaRequest.from_version.
+  2: required list<CatalogObjects.TCatalogObject> updated_objects
+
+  // List of deleted catalog objects whose catalog version is larger than
+  // TGetCatalogDelta.from_version.
+  3: required list<CatalogObjects.TCatalogObject> deleted_objects
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index ef19d25..8a93998 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -41,7 +41,9 @@ struct TCatalogServiceRequestHeader {
 
 // Returns details on the result of an operation that updates the catalog. Information
 // returned includes the Status of the operations, the catalog version that will contain
-// the update, and the catalog service ID.
+// the update, and the catalog service ID. If SYNC_DDL was set in the query options, it
+// also returns the version of the catalog update that this operation must wait for
+// before returning the response to the client.
 struct TCatalogUpdateResult {
   // The CatalogService service ID this result came from.
   1: required Types.TUniqueId catalog_service_id
@@ -52,11 +54,14 @@ struct TCatalogUpdateResult {
   // The status of the operation, OK if the operation was successful.
   3: required Status.TStatus status
 
+  // True if this is a result of an INVALIDATE METADATA operation.
+  4: required bool is_invalidate
+
   // The resulting TCatalogObjects that were added or modified, if applicable.
-  4: optional list<CatalogObjects.TCatalogObject> updated_catalog_objects
+  5: optional list<CatalogObjects.TCatalogObject> updated_catalog_objects
 
   // The resulting TCatalogObjects that were removed, if applicable.
-  5: optional list<CatalogObjects.TCatalogObject> removed_catalog_objects
+  6: optional list<CatalogObjects.TCatalogObject> removed_catalog_objects
 }
 
 // Request for executing a DDL operation (CREATE, ALTER, DROP).
@@ -64,63 +69,66 @@ struct TDdlExecRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
 
   // Common header included in all CatalogService requests.
-  17: optional TCatalogServiceRequestHeader header
+  2: optional TCatalogServiceRequestHeader header
 
-  2: required JniCatalog.TDdlType ddl_type
+  3: required JniCatalog.TDdlType ddl_type
 
   // Parameters for ALTER TABLE
-  3: optional JniCatalog.TAlterTableParams alter_table_params
+  4: optional JniCatalog.TAlterTableParams alter_table_params
 
   // Parameters for ALTER VIEW
-  4: optional JniCatalog.TCreateOrAlterViewParams alter_view_params
+  5: optional JniCatalog.TCreateOrAlterViewParams alter_view_params
 
   // Parameters for CREATE DATABASE
-  5: optional JniCatalog.TCreateDbParams create_db_params
+  6: optional JniCatalog.TCreateDbParams create_db_params
 
   // Parameters for CREATE TABLE
-  6: optional JniCatalog.TCreateTableParams create_table_params
+  7: optional JniCatalog.TCreateTableParams create_table_params
 
   // Parameters for CREATE TABLE LIKE
-  7: optional JniCatalog.TCreateTableLikeParams create_table_like_params
+  8: optional JniCatalog.TCreateTableLikeParams create_table_like_params
 
   // Parameters for CREATE VIEW
-  8: optional JniCatalog.TCreateOrAlterViewParams create_view_params
+  9: optional JniCatalog.TCreateOrAlterViewParams create_view_params
 
   // Parameters for CREATE FUNCTION
-  9: optional JniCatalog.TCreateFunctionParams create_fn_params
+  10: optional JniCatalog.TCreateFunctionParams create_fn_params
 
   // Parameters for DROP DATABASE
-  10: optional JniCatalog.TDropDbParams drop_db_params
+  11: optional JniCatalog.TDropDbParams drop_db_params
 
   // Parameters for DROP TABLE/VIEW
-  11: optional JniCatalog.TDropTableOrViewParams drop_table_or_view_params
+  12: optional JniCatalog.TDropTableOrViewParams drop_table_or_view_params
 
   // Parameters for TRUNCATE TABLE
-  21: optional JniCatalog.TTruncateParams truncate_params
+  13: optional JniCatalog.TTruncateParams truncate_params
 
   // Parameters for DROP FUNCTION
-  12: optional JniCatalog.TDropFunctionParams drop_fn_params
+  14: optional JniCatalog.TDropFunctionParams drop_fn_params
 
   // Parameters for COMPUTE STATS
-  13: optional JniCatalog.TComputeStatsParams compute_stats_params
+  15: optional JniCatalog.TComputeStatsParams compute_stats_params
 
   // Parameters for CREATE DATA SOURCE
-  14: optional JniCatalog.TCreateDataSourceParams create_data_source_params
+  16: optional JniCatalog.TCreateDataSourceParams create_data_source_params
 
   // Parameters for DROP DATA SOURCE
-  15: optional JniCatalog.TDropDataSourceParams drop_data_source_params
+  17: optional JniCatalog.TDropDataSourceParams drop_data_source_params
 
   // Parameters for DROP STATS
-  16: optional JniCatalog.TDropStatsParams drop_stats_params
+  18: optional JniCatalog.TDropStatsParams drop_stats_params
 
   // Parameters for CREATE/DROP ROLE
-  18: optional JniCatalog.TCreateDropRoleParams create_drop_role_params
+  19: optional JniCatalog.TCreateDropRoleParams create_drop_role_params
 
   // Parameters for GRANT/REVOKE ROLE
-  19: optional JniCatalog.TGrantRevokeRoleParams grant_revoke_role_params
+  20: optional JniCatalog.TGrantRevokeRoleParams grant_revoke_role_params
 
   // Parameters for GRANT/REVOKE privilege
-  20: optional JniCatalog.TGrantRevokePrivParams grant_revoke_priv_params
+  21: optional JniCatalog.TGrantRevokePrivParams grant_revoke_priv_params
+
+  // True if SYNC_DDL is set in query options
+  22: required bool sync_ddl
 }
 
 // Response from executing a TDdlExecRequest
@@ -147,18 +155,21 @@ struct TDdlExecResponse {
 struct TUpdateCatalogRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
 
+  // True if SYNC_DDL is set in query options.
+  2: required bool sync_ddl
+
   // Common header included in all CatalogService requests.
-  2: optional TCatalogServiceRequestHeader header
+  3: optional TCatalogServiceRequestHeader header
 
   // Unqualified name of the table to change
-  3: required string target_table;
+  4: required string target_table;
 
   // Database that the table belongs to
-  4: required string db_name;
+  5: required string db_name;
 
   // List of partitions that are new and need to be created. May
   // include the root partition (represented by the empty string).
-  5: required set<string> created_partitions;
+  6: required set<string> created_partitions;
 }
 
 // Response from a TUpdateCatalogRequest
@@ -171,14 +182,14 @@ struct TResetMetadataRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
 
   // Common header included in all CatalogService requests.
-  4: optional TCatalogServiceRequestHeader header
+  2: optional TCatalogServiceRequestHeader header
 
   // If true, refresh. Otherwise, invalidate metadata
-  2: required bool is_refresh
+  3: required bool is_refresh
 
   // Fully qualified name of the table to refresh or invalidate; not set if invalidating
   // the entire catalog
-  3: optional CatalogObjects.TTableName table_name
+  4: optional CatalogObjects.TTableName table_name
 
   // If set, refreshes the specified partition, otherwise
   // refreshes the whole table
@@ -186,6 +197,9 @@ struct TResetMetadataRequest {
 
   // If set, refreshes functions in the specified database.
   6: optional string db_name
+
+  // True if SYNC_DDL is set in query options
+  7: required bool sync_ddl
 }
 
 // Response from TResetMetadataRequest

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index fa315bd..f856871 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -421,56 +421,59 @@ enum TCatalogOpType {
 struct TCatalogOpRequest {
   1: required TCatalogOpType op_type
 
+  // True if SYNC_DDL is used in the query options
+  2: required bool sync_ddl
+
   // Parameters for USE commands
-  2: optional TUseDbParams use_db_params
+  3: optional TUseDbParams use_db_params
 
   // Parameters for DESCRIBE DATABASE db commands
-  17: optional TDescribeDbParams describe_db_params
+  4: optional TDescribeDbParams describe_db_params
 
   // Parameters for DESCRIBE table commands
-  3: optional TDescribeTableParams describe_table_params
+  5: optional TDescribeTableParams describe_table_params
 
   // Parameters for SHOW DATABASES
-  4: optional TShowDbsParams show_dbs_params
+  6: optional TShowDbsParams show_dbs_params
 
   // Parameters for SHOW TABLES
-  5: optional TShowTablesParams show_tables_params
+  7: optional TShowTablesParams show_tables_params
 
   // Parameters for SHOW FUNCTIONS
-  6: optional TShowFunctionsParams show_fns_params
+  8: optional TShowFunctionsParams show_fns_params
 
   // Parameters for SHOW DATA SOURCES
-  11: optional TShowDataSrcsParams show_data_srcs_params
+  9: optional TShowDataSrcsParams show_data_srcs_params
 
   // Parameters for SHOW ROLES
-  12: optional TShowRolesParams show_roles_params
+  10: optional TShowRolesParams show_roles_params
 
   // Parameters for SHOW GRANT ROLE
-  13: optional TShowGrantRoleParams show_grant_role_params
+  11: optional TShowGrantRoleParams show_grant_role_params
 
   // Parameters for DDL requests executed using the CatalogServer
   // such as CREATE, ALTER, and DROP. See CatalogService.TDdlExecRequest
   // for details.
-  7: optional CatalogService.TDdlExecRequest ddl_params
+  12: optional CatalogService.TDdlExecRequest ddl_params
 
   // Parameters for RESET/INVALIDATE METADATA, executed using the CatalogServer.
   // See CatalogService.TResetMetadataRequest for more details.
-  8: optional CatalogService.TResetMetadataRequest reset_metadata_params
+  13: optional CatalogService.TResetMetadataRequest reset_metadata_params
 
   // Parameters for SHOW TABLE/COLUMN STATS
-  9: optional TShowStatsParams show_stats_params
+  14: optional TShowStatsParams show_stats_params
 
   // Parameters for SHOW CREATE TABLE
-  10: optional CatalogObjects.TTableName show_create_table_params
+  15: optional CatalogObjects.TTableName show_create_table_params
 
   // Parameters for SHOW FILES
-  14: optional TShowFilesParams show_files_params
+  16: optional TShowFilesParams show_files_params
 
   // Column lineage graph
-  15: optional LineageGraph.TLineageGraph lineage_graph
+  17: optional LineageGraph.TLineageGraph lineage_graph
 
   // Parameters for SHOW_CREATE_FUNCTION
-  16: optional TGetFunctionsParams show_create_function_params
+  18: optional TGetFunctionsParams show_create_function_params
 }
 
 // Parameters for the SET query option command
@@ -661,6 +664,9 @@ struct TUpdateCatalogCacheRequest {
 struct TUpdateCatalogCacheResponse {
   // The catalog service id this version is from.
   1: required Types.TUniqueId catalog_service_id
+
+  // The minimum catalog object version after CatalogUpdate() was processed.
+  2: required i64 min_catalog_object_version
 }
 
 // Sent from the impalad BE to FE with the latest cluster membership snapshot resulting

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 60a0d0d..13a87c9 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -79,8 +79,15 @@ struct TTopicItem {
   1: required string key;
 
   // Byte-string value for this topic entry. May not be null-terminated (in that it may
-  // contain null bytes)
+  // contain null bytes). It can be non-empty when deleted is true. This is needed when
+  // subscribers require additional information not captured in the item key to process
+  // the deleted item (e.g., catalog version of deleted catalog object).
   2: required string value;
+
+  // If true, this item was deleted. When false, this TTopicItem need not be included in
+  // non-delta TTopicDelta's since the latest version of every still-present topic item
+  // will be included.
+  3: required bool deleted = false;
 }
 
 // Set of changes to a single topic, sent from the statestore to a subscriber as well as
@@ -89,15 +96,14 @@ struct TTopicDelta {
   // Name of the topic this delta applies to
   1: required string topic_name;
 
-  // List of changes to topic entries
+  // When is_delta=true, a list of changes to topic entries, including deletions, within
+  // [from_version, to_version].
+  // When is_delta=false, this is the list of all non-delete topic entries for
+  // [0, to_version], which can be used to reconstruct the topic from scratch.
   2: required list<TTopicItem> topic_entries;
 
-  // List of topic item keys whose entries have been deleted
-  3: required list<string> topic_deletions;
-
-  // True if entries / deletions are to be applied to in-memory state,
-  // otherwise topic_entries contains entire topic state.
-  4: required bool is_delta;
+  // True if entries / deletions are relative to the topic at versions [0, from_version].
+  3: required bool is_delta;
 
   // The Topic version range this delta covers. If there have been changes to the topic,
   // the update will include all changes in the range: [from_version, to_version).
@@ -105,16 +111,17 @@ struct TTopicDelta {
   // to_version. The from_version will always be 0 for non-delta updates.
   // If this is an update being sent from a subscriber to the statestore, the from_version
   // is set only when recovering from an inconsistent state, to the last version of the
-  // topic the subscriber successfully processed.
-  5: optional i64 from_version
-  6: optional i64 to_version
+  // topic the subscriber successfully processed. The value of to_version doesn't depend
+  // on whether the update is delta or not.
+  4: optional i64 from_version
+  5: optional i64 to_version
 
   // The minimum topic version of all subscribers to the topic. This can be used to
   // determine when all subscribers have successfully processed a specific update.
   // This is guaranteed because no subscriber will ever be sent a topic containing
   // keys with a version < min_subscriber_topic_version. Only used when sending an update
   // from the statestore to a subscriber.
-  7: optional i64 min_subscriber_topic_version
+  6: optional i64 min_subscriber_topic_version
 }
 
 // Description of a topic to subscribe to as part of a RegisterSubscriber call

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index 624caad..bc83522 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -86,6 +86,7 @@ public class AuthorizationPolicy implements PrivilegeCache {
     if (existingRole != null) {
       // Remove the role. This will also clean up the grantGroup mappings.
       removeRole(existingRole.getName());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
       if (existingRole.getId() == role.getId()) {
         // Copy the privileges from the existing role.
         for (RolePrivilege p: existingRole.getPrivileges()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 994aa37..9ed8133 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -32,6 +32,7 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TPartitionKeyValue;
+import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.log4j.Logger;
@@ -141,18 +142,23 @@ public abstract class Catalog {
   }
 
   /**
-   * Returns the Table object for the given dbName/tableName. This will trigger a
-   * metadata load if the table metadata is not yet cached.
+   * Returns the Table object for the given dbName/tableName. If 'throwIfError' is true,
+   * an exception is thrown if the associated database does not exist. Otherwise, null is
+   * returned.
    */
-  public Table getTable(String dbName, String tableName) throws
-      CatalogException {
+  public Table getTable(String dbName, String tableName, boolean throwIfError)
+      throws CatalogException {
     Db db = getDb(dbName);
-    if (db == null) {
+    if (db == null && throwIfError) {
       throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
     }
     return db.getTable(tableName);
   }
 
+  public Table getTable(String dbName, String tableName) throws CatalogException {
+    return getTable(dbName, tableName, true);
+  }
+
   /**
    * Removes a table from the catalog and returns the table that was removed, or null
    * if the table/database does not exist.
@@ -530,4 +536,45 @@ public abstract class Catalog {
   public static boolean isDefaultDb(String dbName) {
     return DEFAULT_DB.equals(dbName.toLowerCase());
   }
+
+  /**
+   * Returns a unique string key of a catalog object.
+   */
+  public static String toCatalogObjectKey(TCatalogObject catalogObject) {
+    Preconditions.checkNotNull(catalogObject);
+    switch (catalogObject.getType()) {
+      case DATABASE:
+        return "DATABASE:" + catalogObject.getDb().getDb_name().toLowerCase();
+      case TABLE:
+      case VIEW:
+        TTable tbl = catalogObject.getTable();
+        return "TABLE:" + tbl.getDb_name().toLowerCase() + "." +
+            tbl.getTbl_name().toLowerCase();
+      case FUNCTION:
+        return "FUNCTION:" + catalogObject.getFn().getName() + "(" +
+            catalogObject.getFn().getSignature() + ")";
+      case ROLE:
+        return "ROLE:" + catalogObject.getRole().getRole_name().toLowerCase();
+      case PRIVILEGE:
+        return "PRIVILEGE:" +
+            catalogObject.getPrivilege().getPrivilege_name().toLowerCase() + "." +
+            Integer.toString(catalogObject.getPrivilege().getRole_id());
+      case HDFS_CACHE_POOL:
+        return "HDFS_CACHE_POOL:" +
+            catalogObject.getCache_pool().getPool_name().toLowerCase();
+      case DATA_SOURCE:
+        return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
+      default:
+        throw new IllegalStateException(
+            "Unsupported catalog object type: " + catalogObject.getType());
+    }
+  }
+
+  /**
+   * Returns true if the two objects have the same object type and key (generated using
+   * toCatalogObjectKey()).
+   */
+  public static boolean keyEquals(TCatalogObject first, TCatalogObject second) {
+    return toCatalogObjectKey(first).equals(toCatalogObjectKey(second));
+  }
 }