You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/16 23:04:19 UTC

[1/4] impala git commit: KUDU-2256: Add GetTransferSize() to RpcContext

Repository: impala
Updated Branches:
  refs/heads/master 91d109d6e -> 3fc42ded0


KUDU-2256: Add GetTransferSize() to RpcContext

This changes adds GetTransferSize() to RpcContext to retrieve the
payload size of the inbound call. This makes it easier to track the
memory of incoming RPCs in the handler methods.

To test this I added a CHECK to one of the handler methods in
CalculatorService.

Change-Id: Iab2519bad1815aeccaa119f1605638bfd3604382
Reviewed-on: http://gerrit.cloudera.org:8080/8998
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9019
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/888a16ca
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/888a16ca
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/888a16ca

Branch: refs/heads/master
Commit: 888a16cad5faf381898ee50d25c49f35111f9142
Parents: 91d109d
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Jan 10 12:28:13 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 16 21:43:46 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/rpc-test-base.h | 1 +
 be/src/kudu/rpc/rpc_context.cc  | 4 ++++
 be/src/kudu/rpc/rpc_context.h   | 6 ++++++
 3 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/888a16ca/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index 204f98d..a30e8dc 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -247,6 +247,7 @@ class CalculatorService : public CalculatorServiceIf {
   }
 
   void Add(const AddRequestPB *req, AddResponsePB *resp, RpcContext *context) override {
+    CHECK_GT(context->GetTransferSize(), 0);
     resp->set_result(req->x() + req->y());
     context->RespondSuccess();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/888a16ca/be/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.cc b/be/src/kudu/rpc/rpc_context.cc
index d6d872e..274966b 100644
--- a/be/src/kudu/rpc/rpc_context.cc
+++ b/be/src/kudu/rpc/rpc_context.cc
@@ -142,6 +142,10 @@ const rpc::RequestIdPB* RpcContext::request_id() const {
   return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
 }
 
+size_t RpcContext::GetTransferSize() const {
+  return call_->GetTransferSize();
+}
+
 Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
   return call_->AddOutboundSidecar(std::move(car), idx);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/888a16ca/be/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_context.h b/be/src/kudu/rpc/rpc_context.h
index ac895fc..938576c 100644
--- a/be/src/kudu/rpc/rpc_context.h
+++ b/be/src/kudu/rpc/rpc_context.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_RPC_RPC_CONTEXT_H
 #define KUDU_RPC_RPC_CONTEXT_H
 
+#include <stddef.h>
 #include <string>
 
 #include "kudu/gutil/gscoped_ptr.h"
@@ -203,6 +204,11 @@ class RpcContext {
   // Returns this call's request id, if it is set.
   const rpc::RequestIdPB* request_id() const;
 
+  // Returns the size of the transfer buffer that backs 'call_'. If the
+  // transfer buffer no longer exists (e.g. GetTransferSize() is called after
+  // DiscardTransfer()), returns 0.
+  size_t GetTransferSize() const;
+
   // Panic the server. This logs a fatal error with the given message, and
   // also includes the current RPC request, requestor, trace information, etc,
   // to make it easier to debug.


[4/4] impala git commit: IMPALA-5058: Improve the concurrency of DDL/DML operations

Posted by ta...@apache.org.
IMPALA-5058: Improve the concurrency of DDL/DML operations

Problem: A long running table metadata operation (e.g. refresh) could
prevent any other metadata operation from making progress if it
coincided with the catalog topic creation operations. The problem was due
to the conservative locking scheme used when catalog topics were
created. In particular, in order to collect a consistent snapshot of
metadata changes, the global catalog lock was held for the entire
duration of that operation.

Solution: To improve the concurrency of catalog operations the following
changes are performed:
* A range of catalog versions determines the catalog changes to be
  included in a catalog update. Any catalog changes that do not fall in
  the specified range are ignored (to be processed in subsequent catalog
  topic updates).
* The catalog allows metadata operations to make progress while
  collecting catalog updates.
* To prevent starvation of catalog updates (i.e. frequently updated
  catalog objects skipping catalog updates indefinitely), we keep track
  of the number of times a catalog object has skipped an update and if
  that number exceeds a threshold it is included in the next catalog
  topic update even if its version is not in the specified topic update
  version range. Hence, the same catalog object may be sent in two
  consecutive catalog topic updates.

This commit also changes the way deletions are handled in the catalog and
disseminated to the impalad nodes through the statestore. In particular:
* Deletions in the catalog are explicitly recorded in a log with
the catalog version in which they occurred. As before, added and deleted
catalog objects are sent to the statestore.
* Topic entries associated with deleted catalog objects have non-empty
values (besided keys) that contain minimal object metadata including the
catalog version.
* Statestore is no longer using the existence or not of
topic entry values in order to identify deleted topic entries. Deleted
topic entries should be explicitly marked as such by the statestore
subscribers that produce them.
* Statestore subscribers now use the 'deleted' flag to determine if a
topic entry corresponds to a deleted item.
* Impalads use the deleted objects' catalog versions when updating the
local catalog cache from a catalog update and not the update's maximum
catalog version.

Testing:
- No new tests were added as these paths are already exercised by
existing tests.
- Run all core and exhaustive tests.

Change-Id: If12467a83acaeca6a127491d89291dedba91a35a
Reviewed-on: http://gerrit.cloudera.org:8080/7731
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/8752


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3fc42ded
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3fc42ded
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3fc42ded

Branch: refs/heads/master
Commit: 3fc42ded02bd9314cbc1881cc52692cde35e02fb
Parents: 888a16c
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Mon Aug 14 11:05:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 16 23:01:32 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  57 +-
 be/src/catalog/catalog-server.h                 |  39 +-
 be/src/catalog/catalog-util.cc                  |  15 -
 be/src/catalog/catalog-util.h                   |   9 -
 be/src/catalog/catalog.cc                       |  19 +-
 be/src/catalog/catalog.h                        |  14 +-
 be/src/exec/catalog-op-executor.cc              |   6 +-
 be/src/exec/catalog-op-executor.h               |   4 +-
 be/src/scheduling/admission-controller.cc       |  18 +-
 be/src/scheduling/admission-controller.h        |   7 +-
 be/src/scheduling/scheduler-test-util.cc        |   5 +-
 be/src/scheduling/scheduler.cc                  |  22 +-
 be/src/service/client-request-state.cc          |   6 +-
 be/src/service/frontend.cc                      |   1 +
 be/src/service/impala-server.cc                 | 245 +++--
 be/src/service/impala-server.h                  |  55 +-
 be/src/statestore/statestore.cc                 |  56 +-
 be/src/statestore/statestore.h                  |  43 +-
 common/thrift/CatalogInternalService.thrift     |  24 +-
 common/thrift/CatalogService.thrift             |  74 +-
 common/thrift/Frontend.thrift                   |  38 +-
 common/thrift/StatestoreService.thrift          |  31 +-
 .../impala/catalog/AuthorizationPolicy.java     |   1 +
 .../java/org/apache/impala/catalog/Catalog.java |  57 +-
 .../apache/impala/catalog/CatalogDeltaLog.java  |  85 +-
 .../apache/impala/catalog/CatalogObject.java    |   5 +-
 .../impala/catalog/CatalogObjectCache.java      |  19 +-
 .../impala/catalog/CatalogObjectImpl.java       |  47 +
 .../catalog/CatalogObjectVersionQueue.java      |  73 ++
 .../impala/catalog/CatalogServiceCatalog.java   | 953 +++++++++++++++----
 .../org/apache/impala/catalog/DataSource.java   |  20 +-
 .../main/java/org/apache/impala/catalog/Db.java |  31 +-
 .../org/apache/impala/catalog/Function.java     |  19 +-
 .../apache/impala/catalog/HdfsCachePool.java    |  11 +-
 .../apache/impala/catalog/ImpaladCatalog.java   |  67 +-
 .../java/org/apache/impala/catalog/Role.java    |  19 +-
 .../apache/impala/catalog/RolePrivilege.java    |  20 +-
 .../java/org/apache/impala/catalog/Table.java   |  33 +-
 .../apache/impala/catalog/TopicUpdateLog.java   | 152 +++
 .../impala/service/CatalogOpExecutor.java       | 158 +--
 .../org/apache/impala/service/Frontend.java     |   6 +-
 .../org/apache/impala/service/JniCatalog.java   |  11 +-
 .../org/apache/impala/util/SentryProxy.java     |  31 +-
 .../testutil/CatalogServiceTestCatalog.java     |   2 +-
 .../impala/testutil/ImpaladTestCatalog.java     |   4 +-
 tests/statestore/test_statestore.py             |   8 +-
 46 files changed, 1768 insertions(+), 852 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 15685d0..b004b22 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -228,13 +228,10 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   const TTopicDelta& delta = topic->second;
 
-  // If this is not a delta update, clear all catalog objects and request an update
-  // from version 0 from the local catalog. There is an optimization that checks if
-  // pending_topic_updates_ was just reloaded from version 0, if they have then skip this
-  // step and use that data.
-  if (delta.from_version == 0 && delta.to_version == 0 &&
-      catalog_objects_min_version_ != 0) {
-    catalog_topic_entry_keys_.clear();
+  // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
+  // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
+  // to reload the full catalog.
+  if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
     last_sent_catalog_version_ = 0L;
   } else {
     // Process the pending topic update.
@@ -284,14 +281,17 @@ void CatalogServer::UpdateCatalogTopicCallback(
     } else if (current_catalog_version != last_sent_catalog_version_) {
       // If there has been a change since the last time the catalog was queried,
       // call into the Catalog to find out what has changed.
-      TGetAllCatalogObjectsResponse catalog_objects;
-      status = catalog_->GetAllCatalogObjects(last_sent_catalog_version_,
-          &catalog_objects);
+      TGetCatalogDeltaResponse catalog_objects;
+      status = catalog_->GetCatalogDelta(last_sent_catalog_version_, &catalog_objects);
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        // Use the catalog objects to build a topic update list.
-        BuildTopicUpdates(catalog_objects.objects);
+        // Use the catalog objects to build a topic update list. These include
+        // objects added to the catalog, 'updated_objects', and objects deleted
+        // from the catalog, 'deleted_objects'. The order in which we process
+        // these two disjoint sets of catalog objects does not matter.
+        BuildTopicUpdates(catalog_objects.updated_objects, false);
+        BuildTopicUpdates(catalog_objects.deleted_objects, true);
         catalog_objects_min_version_ = last_sent_catalog_version_;
         catalog_objects_max_version_ = catalog_objects.max_catalog_version;
       }
@@ -302,31 +302,19 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
-void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) {
-  unordered_set<string> current_entry_keys;
-  // Add any new/updated catalog objects to the topic.
+void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects,
+    bool topic_deletions) {
   for (const TCatalogObject& catalog_object: catalog_objects) {
+    DCHECK_GT(catalog_object.catalog_version, last_sent_catalog_version_);
     const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
     if (entry_key.empty()) {
       LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
                                << ThriftDebugString(catalog_object);
     }
-
-    current_entry_keys.insert(entry_key);
-    // Remove this entry from catalog_topic_entry_keys_. At the end of this loop, we will
-    // be left with the set of keys that were in the last update, but not in this
-    // update, indicating which objects have been removed/dropped.
-    catalog_topic_entry_keys_.erase(entry_key);
-
-    // This isn't a new or an updated item, skip it.
-    if (catalog_object.catalog_version <= last_sent_catalog_version_) continue;
-
-    VLOG(1) << "Publishing update: " << entry_key << "@"
-            << catalog_object.catalog_version;
-
     pending_topic_updates_.push_back(TTopicItem());
     TTopicItem& item = pending_topic_updates_.back();
     item.key = entry_key;
+    item.deleted = topic_deletions;
     Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
     if (!status.ok()) {
       LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
@@ -340,18 +328,9 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
         pending_topic_updates_.pop_back();
       }
     }
+    VLOG(1) << "Publishing " << (topic_deletions ? "deletion " : "update ")
+        << ": " << entry_key << "@" << catalog_object.catalog_version;
   }
-
-  // Any remaining items in catalog_topic_entry_keys_ indicate the object was removed
-  // since the last update.
-  for (const string& key: catalog_topic_entry_keys_) {
-    pending_topic_updates_.push_back(TTopicItem());
-    TTopicItem& item = pending_topic_updates_.back();
-    item.key = key;
-    VLOG(1) << "Publishing deletion: " << key;
-    // Don't set a value to mark this item as deleted.
-  }
-  catalog_topic_entry_keys_.swap(current_entry_keys);
 }
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index bf88e00..78a3f20 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -88,12 +88,6 @@ class CatalogServer {
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
-  /// Tracks the set of catalog objects that exist via their topic entry key.
-  /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects
-  /// that exist by their topic entry key. Used to track objects that have been removed
-  /// since the last heartbeat.
-  boost::unordered_set<std::string> catalog_topic_entry_keys_;
-
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -135,14 +129,10 @@ class CatalogServer {
   /// finds all catalog objects that have a catalog version greater than the last update
   /// sent by calling into the JniCatalog. The topic is updated with any catalog objects
   /// that are new or have been modified since the last heartbeat (by comparing the
-  /// catalog version of the object with last_sent_catalog_version_). Also determines any
-  /// deletions of catalog objects by looking at the
-  /// difference of the last set of topic entry keys that were sent and the current set
-  /// of topic entry keys. At the end of execution it notifies the
-  /// catalog_update_gathering_thread_ to fetch the next set of updates from the
-  /// JniCatalog.
-  /// All updates are added to the subscriber_topic_updates list and sent back to the
-  /// Statestore.
+  /// catalog version of the object with last_sent_catalog_version_). At the end of
+  /// execution it notifies the catalog_update_gathering_thread_ to fetch the next set of
+  /// updates from the JniCatalog. All updates are added to the subscriber_topic_updates
+  /// list and sent back to the Statestore.
   void UpdateCatalogTopicCallback(
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -153,20 +143,19 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
-  /// This function determines what items have been added/removed from the catalog
-  /// since the last heartbeat and builds the next topic update to send. To do this, it
-  /// enumerates the given catalog objects returned looking for the objects that have a
-  /// catalog version that is > the catalog version sent with the last heartbeat. To
-  /// determine items that have been deleted, it saves the set of topic entry keys sent
-  /// with the last update and looks at the difference between it and the current set of
-  /// topic entry keys.
+  /// Builds the next topic update to send based on what items
+  /// have been added/changed/removed from the catalog since the last hearbeat. To do
+  /// this, it enumerates the given catalog objects returned looking for the objects that
+  /// have a catalog version that is > the catalog version sent with the last heartbeat.
+  /// 'topic_deletions' is true if 'catalog_objects' contain deleted catalog
+  /// objects.
+  ///
   /// The key for each entry is a string composed of:
   /// "TCatalogObjectType:<unique object name>". So for table foo.bar, the key would be
   /// "TABLE:foo.bar". Encoding the object type information in the key ensures the keys
-  /// are unique, as well as helps to determine what object type was removed in a state
-  /// store delta update (since the state store only sends key names for deleted items).
-  /// Must hold catalog_lock_ when calling this function.
-  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects);
+  /// are unique. Must hold catalog_lock_ when calling this function.
+  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects,
+      bool topic_deletions);
 
   /// Example output:
   /// "databases": [

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index e968742..789723e 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -55,21 +55,6 @@ TCatalogObjectType::type TCatalogObjectTypeFromName(const string& name) {
   return TCatalogObjectType::UNKNOWN;
 }
 
-Status TCatalogObjectFromEntryKey(const string& key,
-    TCatalogObject* catalog_object) {
-  // Reconstruct the object based only on the key.
-  size_t pos = key.find(":");
-  if (pos == string::npos || pos >= key.size() - 1) {
-    stringstream error_msg;
-    error_msg << "Invalid topic entry key format: " << key;
-    return Status(error_msg.str());
-  }
-
-  TCatalogObjectType::type object_type = TCatalogObjectTypeFromName(key.substr(0, pos));
-  string object_name = key.substr(pos + 1);
-  return TCatalogObjectFromObjectName(object_type, object_name, catalog_object);
-}
-
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
     const string& object_name, TCatalogObject* catalog_object) {
   switch (object_type) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index ddc2c21..e98cd38 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -32,15 +32,6 @@ class Status;
 /// TCatalogObjectType::UNKNOWN if no match was found.
 TCatalogObjectType::type TCatalogObjectTypeFromName(const std::string& name);
 
-/// Parses the given IMPALA_CATALOG_TOPIC topic entry key to determine the
-/// TCatalogObjectType and unique object name. Populates catalog_object with the result.
-/// This is used to reconstruct type information when an item is deleted from the
-/// topic. At that time the only context available about the object being deleted is its
-/// its topic entry key which contains only the type and object name. The resulting
-/// TCatalogObject can then be used to removing a matching item from the catalog.
-Status TCatalogObjectFromEntryKey(const std::string& key,
-    TCatalogObject* catalog_object);
-
 /// Populates a TCatalogObject based on the given object type (TABLE, DATABASE, etc) and
 /// object name string.
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index e7e05da..b6dd86a 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -62,7 +62,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
-    {"getCatalogObjects", "(J)[B", &get_catalog_objects_id_},
+    {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -97,19 +97,10 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
-Status Catalog::GetAllCatalogObjects(long from_version,
-    TGetAllCatalogObjectsResponse* resp) {
-  JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jvalue requested_from_version;
-  requested_from_version.j = from_version;
-  jbyteArray result_bytes = static_cast<jbyteArray>(
-      jni_env->CallObjectMethod(catalog_, get_catalog_objects_id_,
-      requested_from_version));
-  RETURN_ERROR_IF_EXC(jni_env);
-  RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, resp));
-  return Status::OK();
+Status Catalog::GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp) {
+  TGetCatalogDeltaRequest request;
+  request.__set_from_version(from_version);
+  return JniUtil::CallJniMethod(catalog_, get_catalog_delta_id_, request, resp);
 }
 
 Status Catalog::ExecDdl(const TDdlExecRequest& req, TDdlExecResponse* resp) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index ab6a2a3..3119d60 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -56,12 +56,10 @@ class Catalog {
   /// Status object with information on the error will be returned.
   Status GetCatalogVersion(long* version);
 
-  /// Gets all Catalog objects and the metadata that is applicable for the given request.
-  /// Always returns all object names that exist in the Catalog, but allows for extended
-  /// metadata for objects that were modified after the specified version.
-  /// Returns OK if the operation was successful, otherwise a Status object with
-  /// information on the error will be returned.
-  Status GetAllCatalogObjects(long from_version, TGetAllCatalogObjectsResponse* resp);
+  /// Retrieves the catalog objects that were added/modified/deleted since version
+  /// 'from_version'. Returns OK if the operation was successful, otherwise a Status
+  /// object with information on the error will be returned.
+  Status GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp);
 
   /// Gets the Thrift representation of a Catalog object. The request is a TCatalogObject
   /// which has the desired TCatalogObjectType and name properly set.
@@ -74,7 +72,7 @@ class Catalog {
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
   /// and each pN may contain wildcards denoted by '*' which match all strings.
   /// TODO: GetDbs() and GetTableNames() can probably be scrapped in favor of
-  /// GetAllCatalogObjects(). Consider removing them and moving everything to use
+  /// GetCatalogDelta(). Consider removing them and moving everything to use
   /// that.
   Status GetDbs(const std::string* pattern, TGetDbsResult* dbs);
 
@@ -109,7 +107,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
-  jmethodID get_catalog_objects_id_;  // JniCatalog.getCatalogObjects()
+  jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 7490ed1..12398cf 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -89,16 +89,20 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
 }
 
 Status CatalogOpExecutor::ExecComputeStats(
-    const TComputeStatsParams& compute_stats_params,
+    const TCatalogOpRequest& compute_stats_request,
     const TTableSchema& tbl_stats_schema, const TRowSet& tbl_stats_data,
     const TTableSchema& col_stats_schema, const TRowSet& col_stats_data) {
   // Create a new DDL request to alter the table's statistics.
   TCatalogOpRequest catalog_op_req;
   catalog_op_req.__isset.ddl_params = true;
   catalog_op_req.__set_op_type(TCatalogOpType::DDL);
+  catalog_op_req.__set_sync_ddl(compute_stats_request.sync_ddl);
   TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
   update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
+  update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
 
+  const TComputeStatsParams& compute_stats_params =
+      compute_stats_request.ddl_params.compute_stats_params;
   TAlterTableUpdateStatsParams& update_stats_params =
       update_stats_req.alter_table_params.update_stats_params;
   update_stats_req.__isset.alter_table_params = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/exec/catalog-op-executor.h
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h
index 33b949e..375c839 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -47,10 +47,10 @@ class CatalogOpExecutor {
   /// be loaded.
   Status GetCatalogObject(const TCatalogObject& object_desc, TCatalogObject* result);
 
-  /// Translates the given compute stats params and its child-query results into
+  /// Translates the given compute stats request and its child-query results into
   /// a new table alteration request for updating the stats metadata, and executes
   /// the alteration via Exec();
-  Status ExecComputeStats(const TComputeStatsParams& compute_stats_params,
+  Status ExecComputeStats(const TCatalogOpRequest& compute_stats_request,
       const apache::hive::service::cli::thrift::TTableSchema& tbl_stats_schema,
       const apache::hive::service::cli::thrift::TRowSet& tbl_stats_data,
       const apache::hive::service::cli::thrift::TTableSchema& col_stats_schema,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ed4e7e2..99f659a 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -645,7 +645,6 @@ void AdmissionController::UpdatePoolStats(
         }
       }
       HandleTopicUpdates(delta.topic_entries);
-      HandleTopicDeletions(delta.topic_deletions);
     }
     UpdateClusterAggregates();
   }
@@ -683,6 +682,10 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
     // The topic entry from this subscriber is handled specially; the stats coming
     // from the statestore are likely already outdated.
     if (topic_backend_id == host_id_) continue;
+    if (item.deleted) {
+      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+      continue;
+    }
     TPoolStats remote_update;
     uint32_t len = item.value.size();
     Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -695,18 +698,7 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
   }
 }
 
-void AdmissionController::HandleTopicDeletions(const vector<string>& topic_deletions) {
-  for (const string& topic_key: topic_deletions) {
-    string pool_name;
-    string topic_backend_id;
-    if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
-    if (topic_backend_id == host_id_) continue;
-    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-  }
-}
-
-void AdmissionController::PoolStats::UpdateAggregates(
-    HostMemMap* host_mem_reserved) {
+void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) {
   const string& coord_id = parent_->host_id_;
   int64_t num_running = 0;
   int64_t num_queued = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 71c9fa4..2830bee 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -428,13 +428,10 @@ class AdmissionController {
   void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
-  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  /// Removes remote stats identified by topic deletions coming from the
+  /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
   void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
 
-  /// Removes remote stats identified by the topic_deletions coming from the statestore.
-  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
-  void HandleTopicDeletions(const std::vector<std::string>& topic_deletions);
-
   /// Re-computes the per-pool aggregate stats and the per-host aggregates in
   /// host_mem_reserved_ using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 6fb2bba..05cfc42 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -478,7 +478,10 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
   TTopicDelta delta;
   delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
-  delta.topic_deletions.push_back(host.ip);
+  TTopicItem item;
+  item.__set_deleted(true);
+  item.__set_key(host.ip);
+  delta.topic_entries.push_back(item);
   SendTopicDelta(delta);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 2bd6c96..5cf0f01 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -131,9 +131,7 @@ void Scheduler::UpdateMembership(
 
   // If the delta transmitted by the statestore is empty we can skip processing
   // altogether and avoid making a copy of executors_config_.
-  if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) {
-    return;
-  }
+  if (delta.is_delta && delta.topic_entries.empty()) return;
 
   // This function needs to handle both delta and non-delta updates. To minimize the
   // time needed to hold locks, all updates are applied to a copy of
@@ -150,10 +148,17 @@ void Scheduler::UpdateMembership(
     new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new entries to the topic. Update executors_config_ and
+  // Process new and removed entries to the topic. Update executors_config_ and
   // current_executors_ to match the set of executors given by the
   // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
+    if (item.deleted) {
+      if (current_executors_.find(item.key) != current_executors_.end()) {
+        new_executors_config->RemoveBackend(current_executors_[item.key]);
+        current_executors_.erase(item.key);
+      }
+      continue;
+    }
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
     // ~10m messages per second, so no immediate need to consider optimization.
@@ -188,15 +193,6 @@ void Scheduler::UpdateMembership(
       current_executors_.insert(make_pair(item.key, be_desc));
     }
   }
-
-  // Process deletions from the topic
-  for (const string& backend_id : delta.topic_deletions) {
-    if (current_executors_.find(backend_id) != current_executors_.end()) {
-      new_executors_config->RemoveBackend(current_executors_[backend_id]);
-      current_executors_.erase(backend_id);
-    }
-  }
-
   SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 87ac8ff..ee35afa 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -183,12 +183,15 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
 
       // Now refresh the table metadata.
       TCatalogOpRequest reset_req;
+      reset_req.__set_sync_ddl(exec_request_.query_options.sync_ddl);
       reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
       reset_req.__set_reset_metadata_params(TResetMetadataRequest());
       reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
       reset_req.reset_metadata_params.__set_is_refresh(true);
       reset_req.reset_metadata_params.__set_table_name(
           exec_request_.load_data_request.table_name);
+      reset_req.reset_metadata_params.__set_sync_ddl(
+          exec_request_.query_options.sync_ddl);
       catalog_op_executor_.reset(
           new CatalogOpExecutor(exec_env_, frontend_, server_profile_));
       RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
@@ -923,6 +926,7 @@ Status ClientRequestState::UpdateCatalog() {
   if (query_exec_request.__isset.finalize_params) {
     const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
     TUpdateCatalogRequest catalog_update;
+    catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
     if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
@@ -1078,7 +1082,7 @@ Status ClientRequestState::UpdateTableAndColumnStats(
   }
 
   Status status = catalog_op_executor_->ExecComputeStats(
-      exec_request_.catalog_op_request.ddl_params.compute_stats_params,
+      exec_request_.catalog_op_request,
       child_queries[0]->result_schema(),
       child_queries[0]->result_data(),
       col_stats_schema,

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index df39114..50883af 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -17,6 +17,7 @@
 
 #include "service/frontend.h"
 
+#include <jni.h>
 #include <list>
 #include <string>
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 5b895ee..6358145 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1327,10 +1327,10 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
-
   // Update catalog cache in frontend. An update is split into batches of size
   // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
-  if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
+  if (delta.topic_entries.size() != 0)  {
+    vector<TCatalogObject> dropped_objects;
     vector<TUpdateCatalogCacheRequest> update_reqs;
     update_reqs.push_back(TUpdateCatalogCacheRequest());
     TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
@@ -1340,7 +1340,6 @@ void ImpalaServer::CatalogUpdateCallback(
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
     uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
-      TCatalogObject catalog_object;
       Status status;
       vector<uint8_t> data_buffer;
       const uint8_t* data_buffer_ptr = nullptr;
@@ -1358,82 +1357,70 @@ void ImpalaServer::CatalogUpdateCallback(
         data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
         len = item.value.size();
       }
+      if (len > 100 * 1024 * 1024 /* 100MB */) {
+        LOG(INFO) << "Received large catalog object(>100mb): "
+            << item.key << " is "
+            << PrettyPrinter::Print(len, TUnit::BYTES);
+      }
+      TCatalogObject catalog_object;
       status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic,
           &catalog_object);
       if (!status.ok()) {
         LOG(ERROR) << "Error deserializing item " << item.key
-                   << ": " << status.GetDetail();
+            << ": " << status.GetDetail();
         continue;
       }
-      if (len > 100 * 1024 * 1024 /* 100MB */) {
-        LOG(INFO) << "Received large catalog update(>100mb): "
-                     << item.key << " is "
-                     << PrettyPrinter::Print(len, TUnit::BYTES);
-      }
-      if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        incremental_request->__set_catalog_service_id(
-            catalog_object.catalog.catalog_service_id);
-        new_catalog_version = catalog_object.catalog_version;
-      }
-
-      // Refresh the lib cache entries of any added functions and data sources
-      // TODO: if frontend returns the list of functions and data sources, we do not
-      // need to deserialize these in backend.
-      if (catalog_object.type == TCatalogObjectType::FUNCTION) {
-        DCHECK(catalog_object.__isset.fn);
-        LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
-      }
-      if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-        DCHECK(catalog_object.__isset.data_source);
-        LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
-      }
 
       if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
+        // Initialize a new batch of catalog updates.
         update_reqs.push_back(TUpdateCatalogCacheRequest());
         incremental_request = &update_reqs.back();
         batch_size_bytes = 0;
       }
-      incremental_request->updated_objects.push_back(catalog_object);
-      batch_size_bytes += len;
-    }
-    update_reqs.push_back(TUpdateCatalogCacheRequest());
-    TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back();
-
-    // We need to look up the dropped functions and data sources and remove them
-    // from the library cache. The data sent from the catalog service does not
-    // contain all the function metadata so we'll ask our local frontend for it. We
-    // need to do this before updating the catalog.
-    vector<TCatalogObject> dropped_objects;
 
-    // Process all Catalog deletions (dropped objects). We only know the keys (object
-    // names) so must parse each key to determine the TCatalogObject.
-    for (const string& key: delta.topic_deletions) {
-      LOG(INFO) << "Catalog topic entry deletion: " << key;
-      TCatalogObject catalog_object;
-      Status status = TCatalogObjectFromEntryKey(key, &catalog_object);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error parsing catalog topic entry deletion key: " << key << " "
-                   << "Error: " << status.GetDetail();
-        continue;
+      if (catalog_object.type == TCatalogObjectType::CATALOG) {
+        incremental_request->__set_catalog_service_id(
+            catalog_object.catalog.catalog_service_id);
+        new_catalog_version = catalog_object.catalog_version;
       }
-      deletion_request->removed_objects.push_back(catalog_object);
-      if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-          catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-        TCatalogObject dropped_object;
-        if (exec_env_->frontend()->GetCatalogObject(
-                catalog_object, &dropped_object).ok()) {
-          // This object may have been dropped and re-created. To avoid removing the
-          // re-created object's entry from the cache verify the existing object has a
-          // catalog version <= the catalog version included in this statestore heartbeat.
-          if (dropped_object.catalog_version <= new_catalog_version) {
-            if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-                catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-              dropped_objects.push_back(dropped_object);
+      VLOG(3) << (item.deleted ? "Deleted " : "Added ") << "item: " << item.key
+          << " version: " << catalog_object.catalog_version << " of size: " << len;
+
+      if (!item.deleted) {
+        // Refresh the lib cache entries of any added functions and data sources
+        // TODO: if frontend returns the list of functions and data sources, we do not
+        // need to deserialize these in backend.
+        if (catalog_object.type == TCatalogObjectType::FUNCTION) {
+          DCHECK(catalog_object.__isset.fn);
+          LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
+        }
+        if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+          DCHECK(catalog_object.__isset.data_source);
+          LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
+        }
+        incremental_request->updated_objects.push_back(catalog_object);
+      } else {
+        // We need to look up any dropped functions and data sources and remove
+        // them from the library cache.
+        if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+            catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+          TCatalogObject existing_object;
+          if (exec_env_->frontend()->GetCatalogObject(
+              catalog_object, &existing_object).ok()) {
+            // If the object exists in the catalog it may have been dropped and
+            // re-created. To avoid removing the re-created object's entry from
+            // the cache verify that the existing object's version <= the
+            // version of the dropped object included in this statestore
+            // heartbeat.
+            DCHECK_NE(existing_object.catalog_version, catalog_object.catalog_version);
+            if (existing_object.catalog_version < catalog_object.catalog_version) {
+              dropped_objects.push_back(existing_object);
             }
           }
         }
-        // Nothing to do in error case.
+        incremental_request->removed_objects.push_back(catalog_object);
       }
+      batch_size_bytes += len;
     }
 
     // Call the FE to apply the changes to the Impalad Catalog.
@@ -1456,6 +1443,9 @@ void ImpalaServer::CatalogUpdateCallback(
         catalog_update_info_.catalog_version = new_catalog_version;
         catalog_update_info_.catalog_topic_version = delta.to_version;
         catalog_update_info_.catalog_service_id = resp.catalog_service_id;
+        catalog_update_info_.min_catalog_object_version = resp.min_catalog_object_version;
+        LOG(INFO) << "Catalog topic update applied with version: " << new_catalog_version
+            << " new min catalog object version: " << resp.min_catalog_object_version;
       }
       ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
       // TODO: deal with an error status
@@ -1482,13 +1472,89 @@ void ImpalaServer::CatalogUpdateCallback(
   catalog_version_update_cv_.NotifyAll();
 }
 
+void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
+    const TUniqueId& catalog_service_id) {
+  unique_lock<mutex> unique_lock(catalog_version_lock_);
+  // Wait for the update to be processed locally.
+  VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
+             << " current version: " << catalog_update_info_.catalog_version;
+  while (catalog_update_info_.catalog_version < catalog_update_version &&
+         catalog_update_info_.catalog_service_id == catalog_service_id) {
+    catalog_version_update_cv_.Wait(unique_lock);
+  }
+
+  if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    VLOG_QUERY << "Detected change in catalog service ID";
+  } else {
+    VLOG_QUERY << "Received catalog version: " << catalog_update_version;
+  }
+}
+
+void ImpalaServer::WaitForCatalogUpdateTopicPropagation(
+    const TUniqueId& catalog_service_id) {
+  unique_lock<mutex> unique_lock(catalog_version_lock_);
+  int64_t min_req_subscriber_topic_version =
+      catalog_update_info_.catalog_topic_version;
+  VLOG_QUERY << "Waiting for min subscriber topic version: "
+      << min_req_subscriber_topic_version << " current version: "
+      << min_subscriber_catalog_topic_version_;
+  while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
+         catalog_update_info_.catalog_service_id == catalog_service_id) {
+    catalog_version_update_cv_.Wait(unique_lock);
+  }
+
+  if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    VLOG_QUERY << "Detected change in catalog service ID";
+  } else {
+    VLOG_QUERY << "Received min subscriber topic version: "
+        << min_req_subscriber_topic_version;
+  }
+}
+
+void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_version,
+    const TUniqueId& catalog_service_id) {
+  unique_lock<mutex> unique_lock(catalog_version_lock_);
+  int64_t min_catalog_object_version =
+      catalog_update_info_.min_catalog_object_version;
+  // TODO: Set a timeout to eventually break out of this loop is something goes
+  // wrong?
+  VLOG_QUERY << "Waiting for minimum catalog object version: "
+      << min_req_catalog_object_version << " current version: "
+      << min_catalog_object_version;
+  while (catalog_update_info_.min_catalog_object_version < min_req_catalog_object_version
+      && catalog_update_info_.catalog_service_id == catalog_service_id) {
+    catalog_version_update_cv_.Wait(unique_lock);
+  }
+
+  if (catalog_update_info_.catalog_service_id != catalog_service_id) {
+    VLOG_QUERY << "Detected change in catalog service ID";
+  } else {
+    VLOG_QUERY << "Updated minimum catalog object version: "
+        << min_req_catalog_object_version;
+  }
+}
+
 Status ImpalaServer::ProcessCatalogUpdateResult(
     const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers) {
-  // If this update result contains catalog objects to add or remove, directly apply the
-  // updates to the local impalad's catalog cache. Otherwise, wait for a statestore
-  // heartbeat that contains this update version.
-  if (catalog_update_result.__isset.updated_catalog_objects ||
-      catalog_update_result.__isset.removed_catalog_objects) {
+  const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
+  if (!catalog_update_result.__isset.updated_catalog_objects &&
+      !catalog_update_result.__isset.removed_catalog_objects) {
+    // Operation with no result set. Use the version specified in
+    // 'catalog_update_result' to determine when the effects of this operation
+    // have been applied to the local catalog cache.
+    if (catalog_update_result.is_invalidate) {
+      WaitForMinCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    } else {
+      WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    }
+    if (wait_for_all_subscribers) {
+      // Now wait for this update to be propagated to all catalog topic subscribers.
+      // If we make it here it implies the first condition was met (the update was
+      // processed locally or the catalog service id has changed).
+      WaitForCatalogUpdateTopicPropagation(catalog_service_id);
+    }
+  } else {
+    // Operation with a result set.
     TUpdateCatalogCacheRequest update_req;
     update_req.__set_is_delta(true);
     update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
@@ -1498,7 +1564,6 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     if (catalog_update_result.__isset.removed_catalog_objects) {
       update_req.__set_removed_objects(catalog_update_result.removed_catalog_objects);
     }
-
     // Apply the changes to the local catalog cache.
     TUpdateCatalogCacheResponse resp;
     Status status = exec_env_->frontend()->UpdateCatalogCache(
@@ -1506,34 +1571,12 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
     RETURN_IF_ERROR(status);
     if (!wait_for_all_subscribers) return Status::OK();
-  }
-
-  unique_lock<mutex> unique_lock(catalog_version_lock_);
-  int64_t min_req_catalog_version = catalog_update_result.version;
-  const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
-
-  // Wait for the update to be processed locally.
-  // TODO: What about query cancellation?
-  VLOG_QUERY << "Waiting for catalog version: " << min_req_catalog_version
-             << " current version: " << catalog_update_info_.catalog_version;
-  while (catalog_update_info_.catalog_version < min_req_catalog_version &&
-         catalog_update_info_.catalog_service_id == catalog_service_id) {
-    catalog_version_update_cv_.Wait(unique_lock);
-  }
-
-  if (!wait_for_all_subscribers) return Status::OK();
-
-  // Now wait for this update to be propagated to all catalog topic subscribers.
-  // If we make it here it implies the first condition was met (the update was processed
-  // locally or the catalog service id has changed).
-  int64_t min_req_subscriber_topic_version = catalog_update_info_.catalog_topic_version;
-
-  VLOG_QUERY << "Waiting for min subscriber topic version: "
-             << min_req_subscriber_topic_version << " current version: "
-             << min_subscriber_catalog_topic_version_;
-  while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
-         catalog_update_info_.catalog_service_id == catalog_service_id) {
-    catalog_version_update_cv_.Wait(unique_lock);
+    // Wait until we receive and process the catalog update that covers the effects
+    // (catalog objects) of this operation.
+    WaitForCatalogUpdate(catalog_update_result.version, catalog_service_id);
+    // Now wait for this update to be propagated to all catalog topic
+    // subscribers.
+    WaitForCatalogUpdateTopicPropagation(catalog_service_id);
   }
   return Status::OK();
 }
@@ -1554,6 +1597,10 @@ void ImpalaServer::MembershipCallback(
 
     // Process membership additions.
     for (const TTopicItem& item: delta.topic_entries) {
+      if (item.deleted) {
+        known_backends_.erase(item.key);
+        continue;
+      }
       uint32_t len = item.value.size();
       TBackendDescriptor backend_descriptor;
       Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -1570,18 +1617,12 @@ void ImpalaServer::MembershipCallback(
     // Only register if all ports have been opened and are ready.
     if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates);
 
-    // Process membership deletions.
-    for (const string& backend_id: delta.topic_deletions) {
-      known_backends_.erase(backend_id);
-    }
-
     // Create a set of known backend network addresses. Used to test for cluster
     // membership by network address.
     set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes is true.
     TUpdateMembershipRequest update_req;
-    bool any_changes = !delta.topic_entries.empty() || !delta.topic_deletions.empty() ||
-        !delta.is_delta;
+    bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
     for (const BackendDescriptorMap::value_type& backend: known_backends_) {
       current_membership.insert(backend.second.address);
       if (any_changes) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 868ae54..dad1ebf 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -308,22 +308,44 @@ class ImpalaServer : public ImpalaServiceIf,
   void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
       std::vector<TTopicDelta>* topic_updates);
 
-  /// Processes a CatalogUpdateResult returned from the CatalogServer and ensures
-  /// the update has been applied to the local impalad's catalog cache. If
-  /// wait_for_all_subscribers is true, this function will also wait until all
-  /// catalog topic subscribers have processed the update. Called from ClientRequestState
-  /// after executing any statement that modifies the catalog.
-  /// If wait_for_all_subscribers is false AND if the TCatalogUpdateResult contains
-  /// TCatalogObject(s) to add and/or remove, this function will update the local cache
-  /// by directly calling UpdateCatalog() with the TCatalogObject results.
-  /// Otherwise this function will wait until the local impalad's catalog cache has been
-  /// updated from a statestore heartbeat that includes this catalog update's catalog
-  /// version. If wait_for_all_subscribers is true, this function also wait all other
-  /// catalog topic subscribers to process this update by checking the current
-  /// min_subscriber_topic_version included in each state store heartbeat.
+  /// Processes a TCatalogUpdateResult returned from the CatalogServer and ensures
+  /// the update has been applied to the local impalad's catalog cache. Called from
+  /// ClientRequestState after executing any statement that modifies the catalog.
+  ///
+  /// If TCatalogUpdateResult contains TCatalogObject(s) to add and/or remove, this
+  /// function will update the local cache by directly calling UpdateCatalog() with the
+  /// TCatalogObject results.
+  ///
+  /// If TCatalogUpdateResult does not contain any TCatalogObjects and this is
+  /// the result of an INVALIDATE METADATA operation, it waits until the minimum
+  /// catalog version in the local cache is greater than or equal to the catalog
+  /// version specified in TCatalogUpdateResult. If it is not an INVALIDATE
+  /// METADATA operation, it waits until the local impalad's catalog cache has
+  /// been updated from a statestore heartbeat that includes this catalog
+  /// update's version.
+  ///
+  /// If wait_for_all_subscribers is true, this function also
+  /// waits for all other catalog topic subscribers to process this update by checking the
+  /// current min_subscriber_topic_version included in each state store heartbeat.
   Status ProcessCatalogUpdateResult(const TCatalogUpdateResult& catalog_update_result,
       bool wait_for_all_subscribers) WARN_UNUSED_RESULT;
 
+  /// Wait until the catalog update with version 'catalog_update_version' is
+  /// received and applied in the local catalog cache or until the catalog
+  /// service id has changed.
+  void WaitForCatalogUpdate(const int64_t catalog_update_version,
+      const TUniqueId& catalog_service_id);
+
+  /// Wait until the minimum catalog object version in the local cache is
+  /// greater than or equal to 'min_catalog_update_version' or until the catalog
+  /// service id has changed.
+  void WaitForMinCatalogUpdate(const int64_t min_catalog_update_version,
+      const TUniqueId& catalog_service_id);
+
+  /// Wait until the last applied catalog update has been broadcast to
+  /// all coordinators or until the catalog service id has changed.
+  void WaitForCatalogUpdateTopicPropagation(const TUniqueId& catalog_service_id);
+
   /// Returns true if lineage logging is enabled, false otherwise.
   bool IsLineageLoggingEnabled();
 
@@ -961,7 +983,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Lock to protect uuid_generator
   boost::mutex uuid_lock_;
 
-  /// Lock for catalog_update_version_info_, min_subscriber_catalog_topic_version_,
+  /// Lock for catalog_update_version_, min_subscriber_catalog_topic_version_,
   /// and catalog_version_update_cv_
   boost::mutex catalog_version_lock_;
 
@@ -972,7 +994,8 @@ class ImpalaServer : public ImpalaServiceIf,
   struct CatalogUpdateVersionInfo {
     CatalogUpdateVersionInfo() :
       catalog_version(0L),
-      catalog_topic_version(0L) {
+      catalog_topic_version(0L),
+      min_catalog_object_version(0L) {
     }
 
     /// The last catalog version returned from UpdateCatalog()
@@ -981,6 +1004,8 @@ class ImpalaServer : public ImpalaServiceIf,
     TUniqueId catalog_service_id;
     /// The statestore catalog topic version this update was received in.
     int64_t catalog_topic_version;
+    /// Minimum catalog object version after a call to UpdateCatalog()
+    int64_t min_catalog_object_version;
   };
 
   /// The version information from the last successfull call to UpdateCatalog().

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 2d93c5f..d0a4851 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -85,8 +85,6 @@ const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-by
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
-const Statestore::TopicEntry::Value Statestore::TopicEntry::NULL_VALUE = "";
-
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
@@ -124,13 +122,13 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
 void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
     TopicEntry::Version version) {
-  DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
+  DCHECK_GT(bytes.size(), 0);
   value_ = bytes;
   version_ = version;
 }
 
 Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes) {
+    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
   TopicEntryMap::iterator entry_it = entries_.find(key);
   int64_t key_size_delta = 0;
   int64_t value_size_delta = 0;
@@ -147,6 +145,7 @@ Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
   value_size_delta += bytes.size();
 
   entry_it->second.SetValue(bytes, ++last_version_);
+  entry_it->second.SetDeleted(is_deleted);
   topic_update_log_.insert(make_pair(entry_it->second.version(), key));
 
   total_key_size_bytes_ += key_size_delta;
@@ -168,12 +167,10 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     // entry
     topic_update_log_.erase(version);
     topic_update_log_.insert(make_pair(++last_version_, key));
-    total_value_size_bytes_ -= entry_it->second.value().size();
-    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
-
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
-    entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
+    entry_it->second.SetDeleted(true);
+    entry_it->second.SetVersion(last_version_);
   }
 }
 
@@ -467,11 +464,9 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // At this point the updates are assumed to have been successfully processed by the
   // subscriber. Update the subscriber's max version of each topic.
-  map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
-      update_state_request.topic_deltas.begin();
-  for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
-    subscriber->SetLastTopicVersionProcessed(topic_delta->first,
-        topic_delta->second.to_version);
+  for (const auto& topic_delta: update_state_request.topic_deltas) {
+    subscriber->SetLastTopicVersionProcessed(topic_delta.first,
+        topic_delta.second.to_version);
   }
 
   // Thirdly: perform any / all updates returned by the subscriber
@@ -500,14 +495,8 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
       Topic* topic = &topic_it->second;
       for (const TTopicItem& item: update.topic_entries) {
-        TopicEntry::Version version = topic->Put(item.key, item.value);
-        subscriber->AddTransientUpdate(update.topic_name, item.key, version);
-      }
-
-      for (const string& key: update.topic_deletions) {
-        TopicEntry::Version version =
-            topic->Put(key, Statestore::TopicEntry::NULL_VALUE);
-        subscriber->AddTransientUpdate(update.topic_name, key, version);
+        subscriber->AddTransientUpdate(update.topic_name, item.key,
+            topic->Put(item.key, item.value, item.deleted));
       }
     }
   }
@@ -541,30 +530,25 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
       TopicUpdateLog::const_iterator next_update =
           topic.topic_update_log().upper_bound(last_processed_version);
 
-      int64_t deleted_key_size_bytes = 0;
+      uint64_t topic_size = 0;
       for (; next_update != topic.topic_update_log().end(); ++next_update) {
         TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
         DCHECK(itr != topic.entries().end());
         const TopicEntry& topic_entry = itr->second;
-        if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
-          if (!topic_delta.is_delta) {
-            deleted_key_size_bytes += itr->first.size();
-            continue;
-          }
-          topic_delta.topic_deletions.push_back(itr->first);
-        } else {
-          topic_delta.topic_entries.push_back(TTopicItem());
-          TTopicItem& topic_item = topic_delta.topic_entries.back();
-          topic_item.key = itr->first;
-          // TODO: Does this do a needless copy?
-          topic_item.value = topic_entry.value();
+        // Don't send deleted entries for non-delta updates.
+        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
+          continue;
         }
+        topic_delta.topic_entries.push_back(TTopicItem());
+        TTopicItem& topic_item = topic_delta.topic_entries.back();
+        topic_item.key = itr->first;
+        topic_item.value = topic_entry.value();
+        topic_item.deleted = topic_entry.is_deleted();
+        topic_size += topic_item.key.size() + topic_item.value.size();
       }
 
       if (!topic_delta.is_delta &&
           topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes
-            + topic.total_value_size_bytes();
         VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
                    << " topic update for " << subscriber.id() << ". Size = "
                    << PrettyPrinter::Print(topic_size, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 1488f7e..38b8361 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -131,8 +131,7 @@ class Statestore : public CacheLineAligned {
 
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
-  /// pair. If the byte string is NULL, the entry has been deleted, but may be retained to
-  /// track changes to send to subscribers.
+  /// pair.
   class TopicEntry {
    public:
     /// A Value is a string of bytes, for which std::string is a convenient representation.
@@ -146,30 +145,38 @@ class Statestore : public CacheLineAligned {
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
 
-    /// Representation of an empty Value. Must have size() == 0.
-    static const Value NULL_VALUE;
-
-    /// Sets the value of this entry to the byte / length pair. NULL_VALUE implies this
-    /// entry has been deleted.  The caller is responsible for ensuring, if required, that
-    /// the version parameter is larger than the current version() TODO: Consider enforcing
-    /// version monotonicity here.
+    /// Sets the value of this entry to the byte / length pair. The caller is responsible
+    /// for ensuring, if required, that the version parameter is larger than the
+    /// current version() TODO: Consider enforcing version monotonicity here.
     void SetValue(const Value& bytes, Version version);
 
-    TopicEntry() : value_(NULL_VALUE), version_(TOPIC_ENTRY_INITIAL_VERSION) { }
+    /// Sets a new version for this entry.
+    void SetVersion(Version version) { version_ = version; }
+
+    /// Sets the is_deleted_ flag for this entry.
+    void SetDeleted(bool is_deleted) { is_deleted_ = is_deleted; }
+
+    TopicEntry() : version_(TOPIC_ENTRY_INITIAL_VERSION),
+        is_deleted_(false) { }
 
     const Value& value() const { return value_; }
     uint64_t version() const { return version_; }
     uint32_t length() const { return value_.size(); }
+    bool is_deleted() const { return is_deleted_; }
 
    private:
-    /// Byte string value, owned by this TopicEntry. The value is opaque to the statestore,
-    /// and is interpreted only by subscribers.
+    /// Byte string value, owned by this TopicEntry. The value is opaque to the
+    /// statestore, and is interpreted only by subscribers.
     Value value_;
 
     /// The version of this entry. Every update is assigned a monotonically increasing
     /// version number so that only the minimal set of changes can be sent from the
     /// statestore to a subscriber.
     Version version_;
+
+    /// Indicates if the entry has been deleted. If true, the entry will still be
+    /// retained to track changes to send to subscribers.
+    bool is_deleted_;
   };
 
   /// Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
@@ -192,19 +199,21 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key. If bytes == NULL_VALUE, the entry is considered
-    /// deleted, and may be garbage collected in the future. The entry is assigned a new
-    /// version number by the Topic, and that version number is returned.
+    /// Adds an entry with the given key and value (bytes). If is_deleted is
+    /// true the entry is considered deleted, and may be garbage collected in the future.
+    /// The entry is assigned a new version number by the Topic, and that version number
+    /// is returned.
     //
     /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes);
+    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
+        bool is_deleted);
 
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
     /// when that subscriber fails (the same entry may exist, but may have been updated by
     /// another subscriber giving it a new version number)
     //
-    /// Deletion means setting the entry's value to NULL and incrementing its version
+    /// Deletion means marking the entry as deleted and incrementing its version
     /// number.
     //
     /// Must be called holding the topic lock

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/CatalogInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogInternalService.thrift b/common/thrift/CatalogInternalService.thrift
index 5b68408..ffccd4b 100644
--- a/common/thrift/CatalogInternalService.thrift
+++ b/common/thrift/CatalogInternalService.thrift
@@ -22,15 +22,25 @@ include "CatalogObjects.thrift"
 
 // Contains structures used internally by the Catalog Server.
 
-// Response from a call to GetAllCatalogObjects. Contains all known Catalog objects
-// (databases, tables/views, and functions) from the CatalogService's cache.
-// What metadata is included for each object is based on the parameters used in
-// the request.
-struct TGetAllCatalogObjectsResponse {
+// Arguments to a GetCatalogDelta call.
+struct TGetCatalogDeltaRequest {
+  // The base catalog version from which the delta is computed.
+  1: required i64 from_version
+}
+
+// Response from a call to GetCatalogDelta. Contains a delta of catalog objects
+// (databases, tables/views, and functions) from the CatalogService's cache relative (>)
+// to the catalog version specified in TGetCatalogDelta.from_version.
+struct TGetCatalogDeltaResponse {
   // The maximum catalog version of all objects in this response or 0 if the Catalog
   // contained no objects.
   1: required i64 max_catalog_version
 
-  // List of catalog objects (empty list if no objects detected in the Catalog).
-  2: required list<CatalogObjects.TCatalogObject> objects
+  // List of updated (new and modified) catalog objects whose catalog verion is
+  // larger than TGetCatalotDeltaRequest.from_version.
+  2: required list<CatalogObjects.TCatalogObject> updated_objects
+
+  // List of deleted catalog objects whose catalog version is larger than
+  // TGetCatalogDelta.from_version.
+  3: required list<CatalogObjects.TCatalogObject> deleted_objects
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index ef19d25..8a93998 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -41,7 +41,9 @@ struct TCatalogServiceRequestHeader {
 
 // Returns details on the result of an operation that updates the catalog. Information
 // returned includes the Status of the operations, the catalog version that will contain
-// the update, and the catalog service ID.
+// the update, and the catalog service ID. If SYNC_DDL was set in the query options, it
+// also returns the version of the catalog update that this operation must wait for
+// before returning the response to the client.
 struct TCatalogUpdateResult {
   // The CatalogService service ID this result came from.
   1: required Types.TUniqueId catalog_service_id
@@ -52,11 +54,14 @@ struct TCatalogUpdateResult {
   // The status of the operation, OK if the operation was successful.
   3: required Status.TStatus status
 
+  // True if this is a result of an INVALIDATE METADATA operation.
+  4: required bool is_invalidate
+
   // The resulting TCatalogObjects that were added or modified, if applicable.
-  4: optional list<CatalogObjects.TCatalogObject> updated_catalog_objects
+  5: optional list<CatalogObjects.TCatalogObject> updated_catalog_objects
 
   // The resulting TCatalogObjects that were removed, if applicable.
-  5: optional list<CatalogObjects.TCatalogObject> removed_catalog_objects
+  6: optional list<CatalogObjects.TCatalogObject> removed_catalog_objects
 }
 
 // Request for executing a DDL operation (CREATE, ALTER, DROP).
@@ -64,63 +69,66 @@ struct TDdlExecRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
 
   // Common header included in all CatalogService requests.
-  17: optional TCatalogServiceRequestHeader header
+  2: optional TCatalogServiceRequestHeader header
 
-  2: required JniCatalog.TDdlType ddl_type
+  3: required JniCatalog.TDdlType ddl_type
 
   // Parameters for ALTER TABLE
-  3: optional JniCatalog.TAlterTableParams alter_table_params
+  4: optional JniCatalog.TAlterTableParams alter_table_params
 
   // Parameters for ALTER VIEW
-  4: optional JniCatalog.TCreateOrAlterViewParams alter_view_params
+  5: optional JniCatalog.TCreateOrAlterViewParams alter_view_params
 
   // Parameters for CREATE DATABASE
-  5: optional JniCatalog.TCreateDbParams create_db_params
+  6: optional JniCatalog.TCreateDbParams create_db_params
 
   // Parameters for CREATE TABLE
-  6: optional JniCatalog.TCreateTableParams create_table_params
+  7: optional JniCatalog.TCreateTableParams create_table_params
 
   // Parameters for CREATE TABLE LIKE
-  7: optional JniCatalog.TCreateTableLikeParams create_table_like_params
+  8: optional JniCatalog.TCreateTableLikeParams create_table_like_params
 
   // Parameters for CREATE VIEW
-  8: optional JniCatalog.TCreateOrAlterViewParams create_view_params
+  9: optional JniCatalog.TCreateOrAlterViewParams create_view_params
 
   // Parameters for CREATE FUNCTION
-  9: optional JniCatalog.TCreateFunctionParams create_fn_params
+  10: optional JniCatalog.TCreateFunctionParams create_fn_params
 
   // Parameters for DROP DATABASE
-  10: optional JniCatalog.TDropDbParams drop_db_params
+  11: optional JniCatalog.TDropDbParams drop_db_params
 
   // Parameters for DROP TABLE/VIEW
-  11: optional JniCatalog.TDropTableOrViewParams drop_table_or_view_params
+  12: optional JniCatalog.TDropTableOrViewParams drop_table_or_view_params
 
   // Parameters for TRUNCATE TABLE
-  21: optional JniCatalog.TTruncateParams truncate_params
+  13: optional JniCatalog.TTruncateParams truncate_params
 
   // Parameters for DROP FUNCTION
-  12: optional JniCatalog.TDropFunctionParams drop_fn_params
+  14: optional JniCatalog.TDropFunctionParams drop_fn_params
 
   // Parameters for COMPUTE STATS
-  13: optional JniCatalog.TComputeStatsParams compute_stats_params
+  15: optional JniCatalog.TComputeStatsParams compute_stats_params
 
   // Parameters for CREATE DATA SOURCE
-  14: optional JniCatalog.TCreateDataSourceParams create_data_source_params
+  16: optional JniCatalog.TCreateDataSourceParams create_data_source_params
 
   // Parameters for DROP DATA SOURCE
-  15: optional JniCatalog.TDropDataSourceParams drop_data_source_params
+  17: optional JniCatalog.TDropDataSourceParams drop_data_source_params
 
   // Parameters for DROP STATS
-  16: optional JniCatalog.TDropStatsParams drop_stats_params
+  18: optional JniCatalog.TDropStatsParams drop_stats_params
 
   // Parameters for CREATE/DROP ROLE
-  18: optional JniCatalog.TCreateDropRoleParams create_drop_role_params
+  19: optional JniCatalog.TCreateDropRoleParams create_drop_role_params
 
   // Parameters for GRANT/REVOKE ROLE
-  19: optional JniCatalog.TGrantRevokeRoleParams grant_revoke_role_params
+  20: optional JniCatalog.TGrantRevokeRoleParams grant_revoke_role_params
 
   // Parameters for GRANT/REVOKE privilege
-  20: optional JniCatalog.TGrantRevokePrivParams grant_revoke_priv_params
+  21: optional JniCatalog.TGrantRevokePrivParams grant_revoke_priv_params
+
+  // True if SYNC_DDL is set in query options
+  22: required bool sync_ddl
 }
 
 // Response from executing a TDdlExecRequest
@@ -147,18 +155,21 @@ struct TDdlExecResponse {
 struct TUpdateCatalogRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
 
+  // True if SYNC_DDL is set in query options.
+  2: required bool sync_ddl
+
   // Common header included in all CatalogService requests.
-  2: optional TCatalogServiceRequestHeader header
+  3: optional TCatalogServiceRequestHeader header
 
   // Unqualified name of the table to change
-  3: required string target_table;
+  4: required string target_table;
 
   // Database that the table belongs to
-  4: required string db_name;
+  5: required string db_name;
 
   // List of partitions that are new and need to be created. May
   // include the root partition (represented by the empty string).
-  5: required set<string> created_partitions;
+  6: required set<string> created_partitions;
 }
 
 // Response from a TUpdateCatalogRequest
@@ -171,14 +182,14 @@ struct TResetMetadataRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
 
   // Common header included in all CatalogService requests.
-  4: optional TCatalogServiceRequestHeader header
+  2: optional TCatalogServiceRequestHeader header
 
   // If true, refresh. Otherwise, invalidate metadata
-  2: required bool is_refresh
+  3: required bool is_refresh
 
   // Fully qualified name of the table to refresh or invalidate; not set if invalidating
   // the entire catalog
-  3: optional CatalogObjects.TTableName table_name
+  4: optional CatalogObjects.TTableName table_name
 
   // If set, refreshes the specified partition, otherwise
   // refreshes the whole table
@@ -186,6 +197,9 @@ struct TResetMetadataRequest {
 
   // If set, refreshes functions in the specified database.
   6: optional string db_name
+
+  // True if SYNC_DDL is set in query options
+  7: required bool sync_ddl
 }
 
 // Response from TResetMetadataRequest

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index fa315bd..f856871 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -421,56 +421,59 @@ enum TCatalogOpType {
 struct TCatalogOpRequest {
   1: required TCatalogOpType op_type
 
+  // True if SYNC_DDL is used in the query options
+  2: required bool sync_ddl
+
   // Parameters for USE commands
-  2: optional TUseDbParams use_db_params
+  3: optional TUseDbParams use_db_params
 
   // Parameters for DESCRIBE DATABASE db commands
-  17: optional TDescribeDbParams describe_db_params
+  4: optional TDescribeDbParams describe_db_params
 
   // Parameters for DESCRIBE table commands
-  3: optional TDescribeTableParams describe_table_params
+  5: optional TDescribeTableParams describe_table_params
 
   // Parameters for SHOW DATABASES
-  4: optional TShowDbsParams show_dbs_params
+  6: optional TShowDbsParams show_dbs_params
 
   // Parameters for SHOW TABLES
-  5: optional TShowTablesParams show_tables_params
+  7: optional TShowTablesParams show_tables_params
 
   // Parameters for SHOW FUNCTIONS
-  6: optional TShowFunctionsParams show_fns_params
+  8: optional TShowFunctionsParams show_fns_params
 
   // Parameters for SHOW DATA SOURCES
-  11: optional TShowDataSrcsParams show_data_srcs_params
+  9: optional TShowDataSrcsParams show_data_srcs_params
 
   // Parameters for SHOW ROLES
-  12: optional TShowRolesParams show_roles_params
+  10: optional TShowRolesParams show_roles_params
 
   // Parameters for SHOW GRANT ROLE
-  13: optional TShowGrantRoleParams show_grant_role_params
+  11: optional TShowGrantRoleParams show_grant_role_params
 
   // Parameters for DDL requests executed using the CatalogServer
   // such as CREATE, ALTER, and DROP. See CatalogService.TDdlExecRequest
   // for details.
-  7: optional CatalogService.TDdlExecRequest ddl_params
+  12: optional CatalogService.TDdlExecRequest ddl_params
 
   // Parameters for RESET/INVALIDATE METADATA, executed using the CatalogServer.
   // See CatalogService.TResetMetadataRequest for more details.
-  8: optional CatalogService.TResetMetadataRequest reset_metadata_params
+  13: optional CatalogService.TResetMetadataRequest reset_metadata_params
 
   // Parameters for SHOW TABLE/COLUMN STATS
-  9: optional TShowStatsParams show_stats_params
+  14: optional TShowStatsParams show_stats_params
 
   // Parameters for SHOW CREATE TABLE
-  10: optional CatalogObjects.TTableName show_create_table_params
+  15: optional CatalogObjects.TTableName show_create_table_params
 
   // Parameters for SHOW FILES
-  14: optional TShowFilesParams show_files_params
+  16: optional TShowFilesParams show_files_params
 
   // Column lineage graph
-  15: optional LineageGraph.TLineageGraph lineage_graph
+  17: optional LineageGraph.TLineageGraph lineage_graph
 
   // Parameters for SHOW_CREATE_FUNCTION
-  16: optional TGetFunctionsParams show_create_function_params
+  18: optional TGetFunctionsParams show_create_function_params
 }
 
 // Parameters for the SET query option command
@@ -661,6 +664,9 @@ struct TUpdateCatalogCacheRequest {
 struct TUpdateCatalogCacheResponse {
   // The catalog service id this version is from.
   1: required Types.TUniqueId catalog_service_id
+
+  // The minimum catalog object version after CatalogUpdate() was processed.
+  2: required i64 min_catalog_object_version
 }
 
 // Sent from the impalad BE to FE with the latest cluster membership snapshot resulting

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 60a0d0d..13a87c9 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -79,8 +79,15 @@ struct TTopicItem {
   1: required string key;
 
   // Byte-string value for this topic entry. May not be null-terminated (in that it may
-  // contain null bytes)
+  // contain null bytes). It can be non-empty when deleted is true. This is needed when
+  // subscribers require additional information not captured in the item key to process
+  // the deleted item (e.g., catalog version of deleted catalog object).
   2: required string value;
+
+  // If true, this item was deleted. When false, this TTopicItem need not be included in
+  // non-delta TTopicDelta's since the latest version of every still-present topic item
+  // will be included.
+  3: required bool deleted = false;
 }
 
 // Set of changes to a single topic, sent from the statestore to a subscriber as well as
@@ -89,15 +96,14 @@ struct TTopicDelta {
   // Name of the topic this delta applies to
   1: required string topic_name;
 
-  // List of changes to topic entries
+  // When is_delta=true, a list of changes to topic entries, including deletions, within
+  // [from_version, to_version].
+  // When is_delta=false, this is the list of all non-delete topic entries for
+  // [0, to_version], which can be used to reconstruct the topic from scratch.
   2: required list<TTopicItem> topic_entries;
 
-  // List of topic item keys whose entries have been deleted
-  3: required list<string> topic_deletions;
-
-  // True if entries / deletions are to be applied to in-memory state,
-  // otherwise topic_entries contains entire topic state.
-  4: required bool is_delta;
+  // True if entries / deletions are relative to the topic at versions [0, from_version].
+  3: required bool is_delta;
 
   // The Topic version range this delta covers. If there have been changes to the topic,
   // the update will include all changes in the range: [from_version, to_version).
@@ -105,16 +111,17 @@ struct TTopicDelta {
   // to_version. The from_version will always be 0 for non-delta updates.
   // If this is an update being sent from a subscriber to the statestore, the from_version
   // is set only when recovering from an inconsistent state, to the last version of the
-  // topic the subscriber successfully processed.
-  5: optional i64 from_version
-  6: optional i64 to_version
+  // topic the subscriber successfully processed. The value of to_version doesn't depend
+  // on whether the update is delta or not.
+  4: optional i64 from_version
+  5: optional i64 to_version
 
   // The minimum topic version of all subscribers to the topic. This can be used to
   // determine when all subscribers have successfully processed a specific update.
   // This is guaranteed because no subscriber will ever be sent a topic containing
   // keys with a version < min_subscriber_topic_version. Only used when sending an update
   // from the statestore to a subscriber.
-  7: optional i64 min_subscriber_topic_version
+  6: optional i64 min_subscriber_topic_version
 }
 
 // Description of a topic to subscribe to as part of a RegisterSubscriber call

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index 624caad..bc83522 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -86,6 +86,7 @@ public class AuthorizationPolicy implements PrivilegeCache {
     if (existingRole != null) {
       // Remove the role. This will also clean up the grantGroup mappings.
       removeRole(existingRole.getName());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
       if (existingRole.getId() == role.getId()) {
         // Copy the privileges from the existing role.
         for (RolePrivilege p: existingRole.getPrivileges()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 994aa37..9ed8133 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -32,6 +32,7 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TPartitionKeyValue;
+import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.log4j.Logger;
@@ -141,18 +142,23 @@ public abstract class Catalog {
   }
 
   /**
-   * Returns the Table object for the given dbName/tableName. This will trigger a
-   * metadata load if the table metadata is not yet cached.
+   * Returns the Table object for the given dbName/tableName. If 'throwIfError' is true,
+   * an exception is thrown if the associated database does not exist. Otherwise, null is
+   * returned.
    */
-  public Table getTable(String dbName, String tableName) throws
-      CatalogException {
+  public Table getTable(String dbName, String tableName, boolean throwIfError)
+      throws CatalogException {
     Db db = getDb(dbName);
-    if (db == null) {
+    if (db == null && throwIfError) {
       throw new DatabaseNotFoundException("Database '" + dbName + "' not found");
     }
     return db.getTable(tableName);
   }
 
+  public Table getTable(String dbName, String tableName) throws CatalogException {
+    return getTable(dbName, tableName, true);
+  }
+
   /**
    * Removes a table from the catalog and returns the table that was removed, or null
    * if the table/database does not exist.
@@ -530,4 +536,45 @@ public abstract class Catalog {
   public static boolean isDefaultDb(String dbName) {
     return DEFAULT_DB.equals(dbName.toLowerCase());
   }
+
+  /**
+   * Returns a unique string key of a catalog object.
+   */
+  public static String toCatalogObjectKey(TCatalogObject catalogObject) {
+    Preconditions.checkNotNull(catalogObject);
+    switch (catalogObject.getType()) {
+      case DATABASE:
+        return "DATABASE:" + catalogObject.getDb().getDb_name().toLowerCase();
+      case TABLE:
+      case VIEW:
+        TTable tbl = catalogObject.getTable();
+        return "TABLE:" + tbl.getDb_name().toLowerCase() + "." +
+            tbl.getTbl_name().toLowerCase();
+      case FUNCTION:
+        return "FUNCTION:" + catalogObject.getFn().getName() + "(" +
+            catalogObject.getFn().getSignature() + ")";
+      case ROLE:
+        return "ROLE:" + catalogObject.getRole().getRole_name().toLowerCase();
+      case PRIVILEGE:
+        return "PRIVILEGE:" +
+            catalogObject.getPrivilege().getPrivilege_name().toLowerCase() + "." +
+            Integer.toString(catalogObject.getPrivilege().getRole_id());
+      case HDFS_CACHE_POOL:
+        return "HDFS_CACHE_POOL:" +
+            catalogObject.getCache_pool().getPool_name().toLowerCase();
+      case DATA_SOURCE:
+        return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
+      default:
+        throw new IllegalStateException(
+            "Unsupported catalog object type: " + catalogObject.getType());
+    }
+  }
+
+  /**
+   * Returns true if the two objects have the same object type and key (generated using
+   * toCatalogObjectKey()).
+   */
+  public static boolean keyEquals(TCatalogObject first, TCatalogObject second) {
+    return toCatalogObjectKey(first).equals(toCatalogObjectKey(second));
+  }
 }


[2/4] impala git commit: IMPALA-5058: Improve the concurrency of DDL/DML operations

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 4c959b2..0e2e8b9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import com.google.common.base.Preconditions;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -29,6 +30,7 @@ import org.apache.thrift.TException;
 
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDataSource;
@@ -173,23 +175,31 @@ public class ImpaladCatalog extends Catalog {
       }
     }
 
-    // Now remove all objects from the catalog. Removing a database before removing
-    // its child tables/functions is fine. If that happens, the removal of the child
-    // object will be a no-op.
+    // Now remove all objects from the catalog. First remove low-level objects (tables,
+    // functions and privileges) and then the top-level objects (databases and roles).
     for (TCatalogObject catalogObject: req.getRemoved_objects()) {
-      removeCatalogObject(catalogObject, newCatalogVersion);
+      if (!isTopLevelCatalogObject(catalogObject)) {
+        removeCatalogObject(catalogObject);
+      }
     }
+    for (TCatalogObject catalogObject: req.getRemoved_objects()) {
+      if (isTopLevelCatalogObject(catalogObject)) {
+        removeCatalogObject(catalogObject);
+      }
+    }
+
+
     lastSyncedCatalogVersion_ = newCatalogVersion;
     // Cleanup old entries in the log.
     catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
     isReady_.set(true);
-
     // Notify all the threads waiting on a catalog update.
     synchronized (catalogUpdateEventNotifier_) {
       catalogUpdateEventNotifier_.notifyAll();
     }
 
-    return new TUpdateCatalogCacheResponse(catalogServiceId_);
+    return new TUpdateCatalogCacheResponse(catalogServiceId_,
+        CatalogObjectVersionQueue.INSTANCE.getMinimumVersion());
   }
 
   /**
@@ -319,24 +329,10 @@ public class ImpaladCatalog extends Catalog {
   /**
    *  Removes the matching TCatalogObject from the catalog, if one exists and its
    *  catalog version is < the catalog version of this drop operation.
-   *  Note that drop operations that come from statestore heartbeats always have a
-   *  version of 0. To determine the drop version for statestore updates,
-   *  the catalog version from the current update is used. This is okay because there
-   *  can never be a catalog update from the statestore that contains a drop
-   *  and an addition of the same object. For more details on how drop
-   *  versioning works, see CatalogServerCatalog.java
    */
-  private void removeCatalogObject(TCatalogObject catalogObject,
-      long currentCatalogUpdateVersion) {
-    // The TCatalogObject associated with a drop operation from a state store
-    // heartbeat will always have a version of zero. Because no update from
-    // the state store can contain both a drop and an addition of the same object,
-    // we can assume the drop version is the current catalog version of this update.
-    // If the TCatalogObject contains a version that != 0, it indicates the drop
-    // came from a direct update.
-    long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
-        currentCatalogUpdateVersion : catalogObject.getCatalog_version();
-
+  private void removeCatalogObject(TCatalogObject catalogObject) {
+    Preconditions.checkState(catalogObject.getCatalog_version() != 0);
+    long dropCatalogVersion = catalogObject.getCatalog_version();
     switch(catalogObject.getType()) {
       case DATABASE:
         removeDb(catalogObject.getDb(), dropCatalogVersion);
@@ -360,7 +356,7 @@ public class ImpaladCatalog extends Catalog {
       case HDFS_CACHE_POOL:
         HdfsCachePool existingItem =
             hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
-        if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) {
+        if (existingItem.getCatalogVersion() <= catalogObject.getCatalog_version()) {
           hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
         }
         break;
@@ -381,6 +377,15 @@ public class ImpaladCatalog extends Catalog {
       Db newDb = Db.fromTDatabase(thriftDb, this);
       newDb.setCatalogVersion(catalogVersion);
       addDb(newDb);
+      if (existingDb != null) {
+        CatalogObjectVersionQueue.INSTANCE.updateVersions(
+            existingDb.getCatalogVersion(), catalogVersion);
+        CatalogObjectVersionQueue.INSTANCE.removeAll(existingDb.getTables());
+        CatalogObjectVersionQueue.INSTANCE.removeAll(
+            existingDb.getFunctions(null, new PatternMatcher()));
+      } else {
+        CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+      }
     }
   }
 
@@ -414,6 +419,12 @@ public class ImpaladCatalog extends Catalog {
     if (existingFn == null ||
         existingFn.getCatalogVersion() < catalogVersion) {
       db.addFunction(function);
+      if (existingFn != null) {
+        CatalogObjectVersionQueue.INSTANCE.updateVersions(
+            existingFn.getCatalogVersion(), catalogVersion);
+      } else {
+        CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+      }
     }
   }
 
@@ -431,6 +442,11 @@ public class ImpaladCatalog extends Catalog {
     Db db = getDb(thriftDb.getDb_name());
     if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
       removeDb(db.getName());
+      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+          db.getCatalogVersion());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(db.getTables());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(
+          db.getFunctions(null, new PatternMatcher()));
     }
   }
 
@@ -455,6 +471,8 @@ public class ImpaladCatalog extends Catalog {
     Function fn = db.getFunction(thriftFn.getSignature());
     if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
       db.removeFunction(thriftFn.getSignature());
+      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+          fn.getCatalogVersion());
     }
   }
 
@@ -463,6 +481,7 @@ public class ImpaladCatalog extends Catalog {
     // version of the drop, remove the function.
     if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
       authPolicy_.removeRole(thriftRole.getRole_name());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Role.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Role.java b/fe/src/main/java/org/apache/impala/catalog/Role.java
index 0b89866..b45ff22 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Role.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Role.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TRole;
 import com.google.common.base.Preconditions;
@@ -30,11 +31,10 @@ import com.google.common.collect.Sets;
 /**
  * Represents a role in an authorization policy. This class is thread safe.
  */
-public class Role implements CatalogObject {
+public class Role extends CatalogObjectImpl {
   private final TRole role_;
   // The last role ID assigned, starts at 0.
   private static AtomicInteger roleId_ = new AtomicInteger(0);
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
 
   private final CatalogObjectCache<RolePrivilege> rolePrivileges_ =
       new CatalogObjectCache<RolePrivilege>();
@@ -134,11 +134,12 @@ public class Role implements CatalogObject {
   public String getName() { return role_.getRole_name(); }
   public int getId() { return role_.getRole_id(); }
   @Override
-  public synchronized long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public synchronized void setCatalogVersion(long newVersion) {
-    catalogVersion_ = newVersion;
+  public String getUniqueName() { return "ROLE:" + getName().toLowerCase(); }
+
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObject.setRole(toThrift());
+    return catalogObject;
   }
-  @Override
-  public boolean isLoaded() { return true; }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
index 87277af..ef3717c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
+++ b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TPrivilegeLevel;
@@ -33,16 +34,14 @@ import com.google.common.collect.Lists;
  * Represents a privilege that has been granted to a role in an authorization policy.
  * This class is thread safe.
  */
-public class RolePrivilege implements CatalogObject {
+public class RolePrivilege extends CatalogObjectImpl {
   private static final Logger LOG = Logger.getLogger(AuthorizationPolicy.class);
   // These Joiners are used to build role names. For simplicity, the role name we
   // use can also be sent to the Sentry library to perform authorization checks
   // so we build them in the same format.
   private static final Joiner AUTHORIZABLE_JOINER = Joiner.on("->");
   private static final Joiner KV_JOINER = Joiner.on("=");
-
   private final TPrivilege privilege_;
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
 
   private RolePrivilege(TPrivilege privilege) {
     privilege_ = privilege;
@@ -132,13 +131,16 @@ public class RolePrivilege implements CatalogObject {
   public String getName() { return privilege_.getPrivilege_name(); }
   public int getRoleId() { return privilege_.getRole_id(); }
   @Override
-  public synchronized long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public synchronized void setCatalogVersion(long newVersion) {
-    catalogVersion_ = newVersion;
+  public String getUniqueName() {
+    return "PRIVILEGE:" + getName().toLowerCase() + "." + Integer.toString(getRoleId());
+  }
+
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObject.setPrivilege(toThrift());
+    return catalogObject;
   }
-  @Override
-  public boolean isLoaded() { return true; }
 
   // The time this role was created. Used to quickly check if the same privilege
   // was dropped and re-created. Assumes a role will not be created + dropped + created

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 23fa7a4..50fe953 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -56,13 +56,9 @@ import com.google.common.collect.Maps;
  * is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions
  * a key range into a fixed number of buckets).
  */
-public abstract class Table implements CatalogObject {
+public abstract class Table extends CatalogObjectImpl {
   private static final Logger LOG = Logger.getLogger(Table.class);
-
-  // Catalog version assigned to this table
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
   protected org.apache.hadoop.hive.metastore.api.Table msTable_;
-
   protected final Db db_;
   protected final String name_;
   protected final String owner_;
@@ -358,13 +354,21 @@ public abstract class Table implements CatalogObject {
   }
 
   public TCatalogObject toTCatalogObject() {
-    TCatalogObject catalogObject = new TCatalogObject();
-    catalogObject.setType(getCatalogObjectType());
-    catalogObject.setCatalog_version(getCatalogVersion());
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
     catalogObject.setTable(toThrift());
     return catalogObject;
   }
 
+  public TCatalogObject toMinimalTCatalogObject() {
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObject.setTable(new TTable());
+    catalogObject.getTable().setDb_name(getDb().getName());
+    catalogObject.getTable().setTbl_name(getName());
+    return catalogObject;
+  }
+
   /**
    * Gets the ColumnType from the given FieldSchema by using Impala's SqlParser.
    * Throws a TableLoadingException if the FieldSchema could not be parsed.
@@ -396,6 +400,8 @@ public abstract class Table implements CatalogObject {
   public TableName getTableName() {
     return new TableName(db_ != null ? db_.getName() : null, name_);
   }
+  @Override
+  public String getUniqueName() { return "TABLE:" + getFullName(); }
 
   public ArrayList<Column> getColumns() { return colsByPos_; }
 
@@ -490,17 +496,6 @@ public abstract class Table implements CatalogObject {
   public TTableStats getTTableStats() { return tableStats_; }
   public ArrayType getType() { return type_; }
 
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long catalogVersion) {
-    catalogVersion_ = catalogVersion;
-  }
-
-  @Override
-  public boolean isLoaded() { return true; }
-
   public static boolean isExternalTable(
       org.apache.hadoop.hive.metastore.api.Table msTbl) {
     return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
new file mode 100644
index 0000000..9d23c4f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+// A log of topic update information for each catalog object. An entry is added to
+// the log when a catalog object is processed (added/removed/skipped) in a topic
+// update and it is replaced every time the catalog object is processed in a
+// topic update.
+//
+// To prevent the log from growing indefinitely, the oldest entries
+// (in terms of last topic update that processed the associated catalog objects) are
+// garbage collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. That will cause
+// entries of deleted catalog objects or entries of objects that haven't been processed
+// by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from
+// the log.
+public class TopicUpdateLog {
+  private static final Logger LOG = Logger.getLogger(TopicUpdateLog.class);
+  // Frequency at which the entries of the topic update log are garbage collected.
+  // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.
+  private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000;
+
+  // Number of topic updates left to trigger a gc of topic update log entries.
+  private int numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+
+  // In the next gc cycle of topic update log entries, all the entries that were last
+  // added to a topic update with version less than or equal to
+  // 'oldestTopicUpdateToGc_' are removed from the update log.
+  private long oldestTopicUpdateToGc_ = -1;
+
+  // Represents an entry in the topic update log. A topic update log entry is
+  // associated with a catalog object and stores information about the last topic update
+  // that processed that object.
+  public static class Entry {
+    // Number of times the entry has skipped a topic update.
+    private final int numSkippedUpdates_;
+    // Last version of the corresponding catalog object that was added to a topic
+    // update. -1 if the object was never added to a topic update.
+    private final long lastSentVersion_;
+    // Version of the last topic update to include the corresponding catalog object.
+    // -1 if the object was never added to a topic update.
+    private final long lastSentTopicUpdate_;
+
+    Entry() {
+      numSkippedUpdates_ = 0;
+      lastSentVersion_ = -1;
+      lastSentTopicUpdate_ = -1;
+    }
+
+    Entry(int numSkippedUpdates, long lastSentVersion, long lastSentCatalogUpdate) {
+      numSkippedUpdates_ = numSkippedUpdates;
+      lastSentVersion_ = lastSentVersion;
+      lastSentTopicUpdate_ = lastSentCatalogUpdate;
+    }
+
+    public int getNumSkippedTopicUpdates() { return numSkippedUpdates_; }
+    public long getLastSentVersion() { return lastSentVersion_; }
+    public long getLastSentCatalogUpdate() { return lastSentTopicUpdate_; }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this.getClass() != other.getClass()) return false;
+      Entry entry = (Entry) other;
+      return numSkippedUpdates_ == entry.getNumSkippedTopicUpdates()
+          && lastSentVersion_ == entry.getLastSentVersion()
+          && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate();
+    }
+  }
+
+  // Entries in the topic update log stored as a map of catalog object keys to
+  // Entry objects.
+  private final ConcurrentHashMap<String, Entry> topicLogEntries_ =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Garbage-collects topic update log entries. These are entries that haven't been
+   * added to any of the last TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates.
+   */
+  public void garbageCollectUpdateLogEntries(long lastTopicUpdateVersion) {
+    if (oldestTopicUpdateToGc_ == -1) {
+      oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
+      return;
+    }
+    if (numTopicUpdatesToGc_ == 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Topic update log GC started.");
+      }
+      Preconditions.checkState(oldestTopicUpdateToGc_ > 0);
+      int numEntriesRemoved = 0;
+      for (Map.Entry<String, Entry> entry:
+           topicLogEntries_.entrySet()) {
+        if (entry.getValue().getLastSentVersion() == -1) continue;
+        if (entry.getValue().getLastSentCatalogUpdate() <= oldestTopicUpdateToGc_) {
+          if (topicLogEntries_.remove(entry.getKey(), entry.getValue())) {
+            ++numEntriesRemoved;
+          }
+        }
+      }
+      numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+      oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Topic update log GC finished. Removed " + numEntriesRemoved +
+            " entries.");
+      }
+    } else {
+      --numTopicUpdatesToGc_;
+    }
+  }
+
+  public void add(String catalogObjectKey, Entry logEntry) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+    Preconditions.checkNotNull(logEntry);
+    topicLogEntries_.put(catalogObjectKey, logEntry);
+  }
+
+  public Entry get(String catalogObjectKey) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+    return topicLogEntries_.get(catalogObjectKey);
+  }
+
+  // Returns the topic update log entry for the catalog object with key
+  // 'catalogObjectKey'. If the key does not exist, a newly constructed log entry is
+  // returned.
+  public Entry getOrCreateLogEntry(String catalogObjectKey) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+    Entry entry = topicLogEntries_.get(catalogObjectKey);
+    if (entry == null) entry = new Entry();
+    return entry;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b0ed45f..295956c 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -179,7 +179,7 @@ import com.google.common.math.LongMath;
  * update operations and requires the use of fair table locks to prevent starvation.
  *
  *   DO {
- *     Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_)
+ *     Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
  *     Try to acquire a table lock
  *     IF the table lock acquisition fails {
  *       Release the catalog lock
@@ -326,6 +326,15 @@ public class CatalogOpExecutor {
           ddlRequest.ddl_type);
     }
 
+    // If SYNC_DDL is set, set the catalog update that contains the results of this DDL
+    // operation. The version of this catalog update is returned to the requesting
+    // impalad which will wait until this catalog update has been broadcast to all the
+    // coordinators.
+    if (ddlRequest.isSync_ddl()) {
+      response.getResult().setVersion(
+          catalog_.waitForSyncDdlVersion(response.getResult()));
+    }
+
     // At this point, the operation is considered successful. If any errors occurred
     // during execution, this function will throw an exception and the CatalogServer
     // will handle setting a bad status code.
@@ -909,12 +918,15 @@ public class CatalogOpExecutor {
     String dbName = params.getDb();
     Preconditions.checkState(dbName != null && !dbName.isEmpty(),
         "Null or empty database name passed as argument to Catalog.createDatabase");
-    if (params.if_not_exists && catalog_.getDb(dbName) != null) {
+    Db existingDb = catalog_.getDb(dbName);
+    if (params.if_not_exists && existingDb != null) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Skipping database creation because " + dbName + " already exists "
             + "and IF NOT EXISTS was specified.");
       }
-      resp.getResult().setVersion(catalog_.getCatalogVersion());
+      Preconditions.checkNotNull(existingDb);
+      resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject());
+      resp.getResult().setVersion(existingDb.getCatalogVersion());
       return;
     }
     org.apache.hadoop.hive.metastore.api.Database db =
@@ -960,11 +972,7 @@ public class CatalogOpExecutor {
       }
 
       Preconditions.checkNotNull(newDb);
-      TCatalogObject thriftDb = new TCatalogObject(
-          TCatalogObjectType.DATABASE, Catalog.INITIAL_CATALOG_VERSION);
-      thriftDb.setDb(newDb.toThrift());
-      thriftDb.setCatalog_version(newDb.getCatalogVersion());
-      resp.result.addToUpdated_catalog_objects(thriftDb);
+      resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject());
     }
     resp.result.setVersion(newDb.getCatalogVersion());
   }
@@ -1038,22 +1046,18 @@ public class CatalogOpExecutor {
       throws ImpalaException {
     if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); }
     DataSource dataSource = DataSource.fromThrift(params.getData_source());
-    if (catalog_.getDataSource(dataSource.getName()) != null) {
+    DataSource existingDataSource = catalog_.getDataSource(dataSource.getName());
+    if (existingDataSource != null) {
       if (!params.if_not_exists) {
         throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
             " already exists.");
       }
-      // The user specified IF NOT EXISTS and the data source exists, just
-      // return the current catalog version.
-      resp.result.setVersion(catalog_.getCatalogVersion());
+      resp.result.addToUpdated_catalog_objects(existingDataSource.toTCatalogObject());
+      resp.result.setVersion(existingDataSource.getCatalogVersion());
       return;
     }
     catalog_.addDataSource(dataSource);
-    TCatalogObject addedObject = new TCatalogObject();
-    addedObject.setType(TCatalogObjectType.DATA_SOURCE);
-    addedObject.setData_source(dataSource.toThrift());
-    addedObject.setCatalog_version(dataSource.getCatalogVersion());
-    resp.result.addToUpdated_catalog_objects(addedObject);
+    resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
   }
 
@@ -1070,11 +1074,7 @@ public class CatalogOpExecutor {
       resp.result.setVersion(catalog_.getCatalogVersion());
       return;
     }
-    TCatalogObject removedObject = new TCatalogObject();
-    removedObject.setType(TCatalogObjectType.DATA_SOURCE);
-    removedObject.setData_source(dataSource.toThrift());
-    removedObject.setCatalog_version(dataSource.getCatalogVersion());
-    resp.result.addToRemoved_catalog_objects(removedObject);
+    resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
   }
 
@@ -1229,7 +1229,7 @@ public class CatalogOpExecutor {
       throw new CatalogException("Database " + db.getName() + " is not empty");
     }
 
-    TCatalogObject removedObject = new TCatalogObject();
+    TCatalogObject removedObject = null;
     synchronized (metastoreDdlLock_) {
       // Remove all the Kudu tables of 'db' from the Kudu storage engine.
       if (db != null && params.cascade) dropTablesFromKudu(db);
@@ -1251,11 +1251,9 @@ public class CatalogOpExecutor {
       for (String tableName: removedDb.getAllTableNames()) {
         uncacheTable(removedDb.getTable(tableName));
       }
-      removedObject.setCatalog_version(removedDb.getCatalogVersion());
+      removedObject = removedDb.toTCatalogObject();
     }
-    removedObject.setType(TCatalogObjectType.DATABASE);
-    removedObject.setDb(new TDatabase());
-    removedObject.getDb().setDb_name(params.getDb());
+    Preconditions.checkNotNull(removedObject);
     resp.result.setVersion(removedObject.getCatalog_version());
     resp.result.addToRemoved_catalog_objects(removedObject);
   }
@@ -1525,12 +1523,17 @@ public class CatalogOpExecutor {
     Preconditions.checkState(params.getColumns() != null,
         "Null column list given as argument to Catalog.createTable");
 
-    if (params.if_not_exists &&
-        catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
+    Table existingTbl = catalog_.getTable(tableName.getDb(), tableName.getTbl(), false);
+    if (params.if_not_exists && existingTbl != null) {
       LOG.trace(String.format("Skipping table creation because %s already exists and " +
           "IF NOT EXISTS was specified.", tableName));
-      response.getResult().setVersion(catalog_.getCatalogVersion());
-      return false;
+      existingTbl.getLock().lock();
+      try {
+        addTableToCatalogUpdate(existingTbl, response.getResult());
+        return false;
+      } finally {
+        existingTbl.getLock().unlock();
+      }
     }
     org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
     LOG.trace(String.format("Creating table %s", tableName));
@@ -1736,12 +1739,17 @@ public class CatalogOpExecutor {
     Preconditions.checkState(tblName != null && tblName.isFullyQualified());
     Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified());
 
-    if (params.if_not_exists &&
-        catalog_.containsTable(tblName.getDb(), tblName.getTbl())) {
+    Table existingTbl = catalog_.getTable(tblName.getDb(), tblName.getTbl(), false);
+    if (params.if_not_exists && existingTbl != null) {
       LOG.trace(String.format("Skipping table creation because %s already exists and " +
           "IF NOT EXISTS was specified.", tblName));
-      response.getResult().setVersion(catalog_.getCatalogVersion());
-      return;
+      existingTbl.getLock().lock();
+      try {
+        addTableToCatalogUpdate(existingTbl, response.getResult());
+        return;
+      } finally {
+        existingTbl.getLock().unlock();
+      }
     }
     Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
     org.apache.hadoop.hive.metastore.api.Table tbl =
@@ -2185,8 +2193,9 @@ public class CatalogOpExecutor {
     }
     // Rename the table in the Catalog and get the resulting catalog object.
     // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
-    Table newTable = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
-    if (newTable == null) {
+    Pair<Table, Table> result =
+        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
+    if (result.first == null || result.second == null) {
       // The rename succeeded in the HMS but failed in the catalog cache. The cache is in
       // an inconsistent state, but can likely be fixed by running "invalidate metadata".
       throw new ImpalaRuntimeException(String.format(
@@ -2196,14 +2205,9 @@ public class CatalogOpExecutor {
           newTableName.toString()));
     }
 
-    TCatalogObject addedObject = newTable.toTCatalogObject();
-    TCatalogObject removedObject = new TCatalogObject();
-    removedObject.setType(TCatalogObjectType.TABLE);
-    removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
-    removedObject.setCatalog_version(addedObject.getCatalog_version());
-    response.result.addToRemoved_catalog_objects(removedObject);
-    response.result.addToUpdated_catalog_objects(addedObject);
-    response.result.setVersion(addedObject.getCatalog_version());
+    response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
+    response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+    response.result.setVersion(result.second.getCatalogVersion());
   }
 
   /**
@@ -2851,28 +2855,18 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(rolePrivileges);
     List<TCatalogObject> updatedPrivs = Lists.newArrayList();
     for (RolePrivilege rolePriv: rolePrivileges) {
-      TCatalogObject catalogObject = new TCatalogObject();
-      catalogObject.setType(rolePriv.getCatalogObjectType());
-      catalogObject.setPrivilege(rolePriv.toThrift());
-      catalogObject.setCatalog_version(rolePriv.getCatalogVersion());
-      updatedPrivs.add(catalogObject);
-    }
-
-    // TODO: Currently we only support sending back 1 catalog object in a "direct DDL"
-    // response. If multiple privileges have been updated, just send back the
-    // catalog version so subscribers can wait for the statestore heartbeat that
-    // contains all updates (see IMPALA-5571).
-    if (updatedPrivs.size() == 1) {
+      updatedPrivs.add(rolePriv.toTCatalogObject());
+    }
+
+    if (!updatedPrivs.isEmpty()) {
       // If this is a REVOKE statement with hasGrantOpt, only the GRANT OPTION is revoked
-      // from the privilege.
+      // from the privileges. Otherwise the privileges are removed from the catalog.
       if (grantRevokePrivParams.isIs_grant() ||
           privileges.get(0).isHas_grant_opt()) {
         resp.result.setUpdated_catalog_objects(updatedPrivs);
       } else {
         resp.result.setRemoved_catalog_objects(updatedPrivs);
       }
-      resp.result.setVersion(updatedPrivs.get(0).getCatalog_version());
-    } else if (updatedPrivs.size() > 1) {
       resp.result.setVersion(
           updatedPrivs.get(updatedPrivs.size() - 1).getCatalog_version());
     }
@@ -3027,6 +3021,9 @@ public class CatalogOpExecutor {
           resp.result.setUpdated_catalog_objects(addedFuncs);
           resp.result.setRemoved_catalog_objects(removedFuncs);
           resp.result.setVersion(catalog_.getCatalogVersion());
+          for (TCatalogObject removedFn: removedFuncs) {
+            catalog_.getDeleteLog().addRemovedObject(removedFn);
+          }
         }
       }
     } else if (req.isSetTable_name()) {
@@ -3059,26 +3056,32 @@ public class CatalogOpExecutor {
             req.getTable_name().getTable_name());
       }
 
-      if (!dbWasAdded.getRef()) {
-        // Return the TCatalogObject in the result to indicate this request can be
-        // processed as a direct DDL operation.
-        if (tblWasRemoved.getRef()) {
-          resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
-        } else {
-          resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
-        }
+      // Return the TCatalogObject in the result to indicate this request can be
+      // processed as a direct DDL operation.
+      if (tblWasRemoved.getRef()) {
+        resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
       } else {
-        // Since multiple catalog objects were modified (db and table), don't treat this
-        // as a direct DDL operation. Set the overall catalog version and the impalad
-        // will wait for a statestore heartbeat that contains the update.
-        Preconditions.checkState(!req.isIs_refresh());
+        resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
+      }
+
+      if (dbWasAdded.getRef()) {
+        Db addedDb = catalog_.getDb(updatedThriftTable.getTable().getDb_name());
+        if (addedDb == null) {
+          throw new CatalogException("Database " +
+              updatedThriftTable.getTable().getDb_name() + " was removed by a " +
+              "concurrent operation. Try invalidating the table again.");
+        }
+        resp.getResult().addToUpdated_catalog_objects(addedDb.toTCatalogObject());
       }
       resp.getResult().setVersion(updatedThriftTable.getCatalog_version());
     } else {
       // Invalidate the entire catalog if no table name is provided.
       Preconditions.checkArgument(!req.isIs_refresh());
-      catalog_.reset();
-      resp.result.setVersion(catalog_.getCatalogVersion());
+      resp.getResult().setVersion(catalog_.reset());
+      resp.getResult().setIs_invalidate(true);
+    }
+    if (req.isSync_ddl()) {
+      resp.getResult().setVersion(catalog_.waitForSyncDdlVersion(resp.getResult()));
     }
     resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
     return resp;
@@ -3261,11 +3264,16 @@ public class CatalogOpExecutor {
 
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
       addTableToCatalogUpdate(table, response.result);
-      return response;
     } finally {
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
       table.getLock().unlock();
     }
+
+    if (update.isSync_ddl()) {
+      response.getResult().setVersion(
+          catalog_.waitForSyncDdlVersion(response.getResult()));
+    }
+    return response;
   }
 
   private List<String> getPartValsFromName(org.apache.hadoop.hive.metastore.api.Table

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index c62fc31..d0936d5 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -520,13 +520,17 @@ public class Frontend {
     } else {
       throw new IllegalStateException("Unexpected CatalogOp statement type.");
     }
-
     result.setResult_set_metadata(metadata);
+    ddl.setSync_ddl(result.getQuery_options().isSync_ddl());
     result.setCatalog_op_request(ddl);
     if (ddl.getOp_type() == TCatalogOpType.DDL) {
       TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
       header.setRequesting_user(analysis.getAnalyzer().getUser().getName());
       ddl.getDdl_params().setHeader(header);
+      ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
+    }
+    if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
+      ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b56527b..e945a3b 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -38,7 +38,8 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaRequest;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -119,9 +120,11 @@ public class JniCatalog {
   /**
    * Gets all catalog objects
    */
-  public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException {
-    TGetAllCatalogObjectsResponse resp =
-        catalog_.getCatalogObjects(from_version);
+  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
+      throws ImpalaException, TException {
+    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
+    JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
+    TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version());
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(resp);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index 23534d2..f2df66b 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -87,7 +87,7 @@ public class SentryProxy {
     }
     sentryPolicyService_ = new SentryPolicyService(sentryConfig);
 
-    policyReader_.scheduleAtFixedRate(new PolicyReader(), 0,
+    policyReader_.scheduleAtFixedRate(new PolicyReader(false), 0,
         BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(),
         TimeUnit.SECONDS);
   }
@@ -107,6 +107,12 @@ public class SentryProxy {
    * atomically.
    */
   private class PolicyReader implements Runnable {
+    private boolean resetVersions_;
+
+    public PolicyReader(boolean resetVersions) {
+      resetVersions_ = resetVersions;
+    }
+
     public void run() {
       synchronized (SentryProxy.this) {
         // Assume all roles should be removed. Then query the Policy Service and remove
@@ -131,6 +137,9 @@ public class SentryProxy {
             if (existingRole != null &&
                 existingRole.getGrantGroups().equals(grantGroups)) {
               role = existingRole;
+              if (resetVersions_) {
+                role.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
+              }
             } else {
               role = catalog_.addRole(sentryRole.getRoleName(), grantGroups);
             }
@@ -160,6 +169,10 @@ public class SentryProxy {
               // We already know about this privilege (privileges cannot be modified).
               if (existingPriv != null &&
                   existingPriv.getCreateTimeMs() == sentryPriv.getCreateTime()) {
+                if (resetVersions_) {
+                  existingPriv.setCatalogVersion(
+                      catalog_.incrementAndGetCatalogVersion());
+                }
                 continue;
               }
               catalog_.addRolePrivilege(role.getName(), thriftPriv);
@@ -302,10 +315,7 @@ public class SentryProxy {
       // Update the catalog
       for (TPrivilege privilege: privileges) {
         RolePrivilege rolePriv = catalog_.removeRolePrivilege(roleName, privilege);
-        if (rolePriv == null) {
-          rolePriv = RolePrivilege.fromThrift(privilege);
-          rolePriv.setCatalogVersion(catalog_.getCatalogVersion());
-        }
+        if (rolePriv == null) continue;
         rolePrivileges.add(rolePriv);
       }
     } else {
@@ -317,12 +327,7 @@ public class SentryProxy {
       List<TPrivilege> updatedPrivileges = Lists.newArrayList();
       for (TPrivilege privilege: privileges) {
         RolePrivilege existingPriv = catalog_.getRolePrivilege(roleName, privilege);
-        if (existingPriv == null) {
-          RolePrivilege rolePriv = RolePrivilege.fromThrift(privilege);
-          rolePriv.setCatalogVersion(catalog_.getCatalogVersion());
-          rolePrivileges.add(rolePriv);
-          continue;
-        }
+        if (existingPriv == null) continue;
         TPrivilege updatedPriv = existingPriv.toThrift();
         updatedPriv.setHas_grant_opt(false);
         updatedPrivileges.add(updatedPriv);
@@ -342,9 +347,9 @@ public class SentryProxy {
    * the Catalog with any changes. Throws an ImpalaRuntimeException if there are any
    * errors executing the refresh job.
    */
-  public void refresh() throws ImpalaRuntimeException {
+  public void refresh(boolean resetVersions) throws ImpalaRuntimeException {
     try {
-      policyReader_.submit(new PolicyReader()).get();
+      policyReader_.submit(new PolicyReader(resetVersions)).get();
     } catch (Exception e) {
       // We shouldn't make it here. It means an exception leaked from the
       // AuthorizationPolicyReader.

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 93e4af0..df2ba0d 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -38,7 +38,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     // Cache pools are typically loaded asynchronously, but as there is no fixed execution
     // order for tests, the cache pools are loaded synchronously before the tests are
     // executed.
-    CachePoolReader rd = new CachePoolReader();
+    CachePoolReader rd = new CachePoolReader(false);
     rd.run();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index fdc64e6..7e8ff46 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -65,9 +65,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
   /**
    * Reloads all metadata from the source catalog.
    */
-  public void reset() throws CatalogException {
-    srcCatalog_.reset();
-  }
+  public void reset() throws CatalogException { srcCatalog_.reset(); }
 
   /**
    * Overrides ImpaladCatalog.getTable to load the table metadata if it is missing.

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index e2b1715..1003dc7 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -311,14 +311,12 @@ class StatestoreSubscriber(object):
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
-                        num_updates=1, deletions=None):
+                        num_updates=1):
     topic_entries = [
       Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
       for x in xrange(num_updates)]
-    if deletions is None: deletions = []
     return Subscriber.TTopicDelta(topic_name=topic_name,
                                   topic_entries=topic_entries,
-                                  topic_deletions=deletions,
                                   is_delta=False)
 
   def test_registration_ids_different(self):
@@ -349,11 +347,9 @@ class TestStatestore():
         assert len(args.topic_deltas) == 1
         assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
-        assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions
       elif sub.update_count == 3:
         # After the content-bearing update was processed, the next delta should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
-        assert len(args.topic_deltas[topic_name].topic_deletions) == 0
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
@@ -461,7 +457,7 @@ class TestStatestore():
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, see
         # IMPALA-1891
-        assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
+        assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),


[3/4] impala git commit: IMPALA-5058: Improve the concurrency of DDL/DML operations

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index 27839b3..1a5e2ec 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -24,24 +25,35 @@ import java.util.TreeMap;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TTable;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
- * The impalad catalog cache can be modified by either a state store update or by a
- * direct ("fast") update that applies the result of a catalog operation to the cache
- * out-of-band of a state store update. This thread safe log tracks the divergence
- * (due to direct updates to the cache) of this impalad's cache from the last state
- * store update. This log is needed to ensure work is never undone. For example,
- * consider the following sequence of events:
- * t1: [Direct Update] - Add item A - (Catalog Version 9)
- * t2: [Direct Update] - Drop item A - (Catalog Version 10)
- * t3: [StateStore Update] - (From Catalog Version 9)
- * This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ * Represents a log of deleted catalog objects.
  *
- * Currently this only tracks objects that were dropped, since the catalog cache can be
- * queried to check if an object was added. TODO: Also track object additions from async
- * operations. This could be used to to "replay" the log in the case of a catalog reset
- * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
- * "invalidate metadata" is run concurrently with async catalog operations.
+ * There are currently two use cases for this log:
+ *
+ * a) Processing catalog updates in the impalads
+ *   The impalad catalog cache can be modified by either a state store update or by a
+ *   direct update that applies the result of a catalog operation to the cache
+ *   out-of-band of a state store update. This thread safe log tracks the divergence
+ *   (due to direct updates to the cache) of this impalad's cache from the last state
+ *   store update. This log is needed to ensure work is never undone. For example,
+ *   consider the following sequence of events:
+ *   t1: [Direct Update] - Add item A - (Catalog Version 9)
+ *   t2: [Direct Update] - Drop item A - (Catalog Version 10)
+ *   t3: [StateStore Update] - (From Catalog Version 9)
+ *   This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ *   Currently this only tracks objects that were dropped, since the catalog cache can be
+ *   queried to check if an object was added. TODO: Also track object additions from async
+ *   operations. This could be used to to "replay" the log in the case of a catalog reset
+ *   ("invalidate metadata"). Currently, the catalog may briefly go back in time if
+ *   "invalidate metadata" is run concurrently with async catalog operations.
+ *
+ * b) Building catalog topic updates in the catalogd
+ *   The catalogd uses this log to identify deleted catalog objects that have been deleted
+ *   since the last catalog topic update. Once the catalog topic update is constructed,
+ *   the old entries in the log are garbage collected to prevent the log from growing
+ *   indefinitely.
  */
 public class CatalogDeltaLog {
   // Map of the catalog version an object was removed from the catalog
@@ -58,6 +70,17 @@ public class CatalogDeltaLog {
   }
 
   /**
+   * Retrieve all the removed catalog objects with versions in range
+   * (fromVersion, toVersion].
+   */
+  public synchronized List<TCatalogObject> retrieveObjects(long fromVersion,
+      long toVersion) {
+    SortedMap<Long, TCatalogObject> objects =
+        removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1);
+    return ImmutableList.<TCatalogObject>copyOf(objects.values());
+  }
+
+  /**
    * Given the current catalog version, removes all items with catalogVersion <
    * currectCatalogVersion. Such objects do not need to be tracked in the delta
    * log anymore because they are consistent with the state store's view of the
@@ -85,36 +108,8 @@ public class CatalogDeltaLog {
     SortedMap<Long, TCatalogObject> candidateObjects =
         removedCatalogObjects_.tailMap(catalogObject.getCatalog_version());
     for (Map.Entry<Long, TCatalogObject> entry: candidateObjects.entrySet()) {
-      if (objectNamesMatch(catalogObject, entry.getValue())) return true;
+      if (Catalog.keyEquals(catalogObject, entry.getValue())) return true;
     }
     return false;
   }
-
-  /**
-   * Returns true if the two objects have the same object type and name.
-   * TODO: Use global object IDs everywhere instead of tracking catalog objects by name.
-   */
-  private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
-    if (first.getType() != second.getType()) return false;
-    switch (first.getType()) {
-      case DATABASE:
-        return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name());
-      case TABLE:
-      case VIEW:
-        TTable firstTbl = first.getTable();
-        return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) &&
-            firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name());
-      case FUNCTION:
-        return first.getFn().getSignature().equals(second.getFn().getSignature()) &&
-            first.getFn().getName().equals(second.getFn().getName());
-      case ROLE:
-        return first.getRole().getRole_name().equalsIgnoreCase(
-            second.getRole().getRole_name());
-      case PRIVILEGE:
-        return first.getPrivilege().getPrivilege_name().equalsIgnoreCase(
-            second.getPrivilege().getPrivilege_name()) &&
-            first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id();
-      default: return false;
-    }
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index a2d8ca9..cc4c495 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -29,6 +29,9 @@ public interface CatalogObject {
   // Returns the unqualified object name.
   public String getName();
 
+  // Returns the unique name of this catalog object.
+  public String getUniqueName();
+
   // Returns the version of this catalog object.
   public long getCatalogVersion();
 
@@ -37,4 +40,4 @@ public interface CatalogObject {
 
   // Returns true if this CatalogObject has had its metadata loaded, false otherwise.
   public boolean isLoaded();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
index c578e41..d882cdb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
@@ -30,6 +30,8 @@ import com.google.common.collect.Lists;
 /**
  * Thread safe cache for storing CatalogObjects. Enforces that updates to existing
  * entries only get applied if the new/updated object has a larger catalog version.
+ * add() and remove() functions also update the entries of the global instance of
+ * CatalogObjectVersionQueue which keeps track of the catalog objects versions.
  */
 public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> {
   private final boolean caseInsensitiveKeys_;
@@ -71,13 +73,19 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
     String key = catalogObject.getName();
     if (caseInsensitiveKeys_) key = key.toLowerCase();
     T existingItem = metadataCache_.putIfAbsent(key, catalogObject);
-    if (existingItem == null) return true;
+    if (existingItem == null) {
+      CatalogObjectVersionQueue.INSTANCE.addVersion(
+          catalogObject.getCatalogVersion());
+      return true;
+    }
 
     if (existingItem.getCatalogVersion() < catalogObject.getCatalogVersion()) {
       // When existingItem != null it indicates there was already an existing entry
       // associated with the key. Add the updated object iff it has a catalog
       // version greater than the existing entry.
       metadataCache_.put(key, catalogObject);
+      CatalogObjectVersionQueue.INSTANCE.updateVersions(
+          existingItem.getCatalogVersion(), catalogObject.getCatalogVersion());
       return true;
     }
     return false;
@@ -89,7 +97,12 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
    */
   public synchronized T remove(String name) {
     if (caseInsensitiveKeys_) name = name.toLowerCase();
-    return metadataCache_.remove(name);
+    T removedObject = metadataCache_.remove(name);
+    if (removedObject != null) {
+      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+          removedObject.getCatalogVersion());
+    }
+    return removedObject;
   }
 
   /**
@@ -144,4 +157,4 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
   public Iterator<T> iterator() {
     return metadataCache_.values().iterator();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
new file mode 100644
index 0000000..321355c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.thrift.TCatalogObjectType;
+
+abstract public class CatalogObjectImpl implements CatalogObject {
+  // Current catalog version of this object. Initialized to
+  // Catalog.INITIAL_CATALOG_VERSION.
+  private AtomicLong catalogVersion_ = new AtomicLong(Catalog.INITIAL_CATALOG_VERSION);
+
+  protected CatalogObjectImpl() {}
+
+  @Override
+  public long getCatalogVersion() { return catalogVersion_.get(); }
+
+  @Override
+  public void setCatalogVersion(long newVersion) { catalogVersion_.set(newVersion); }
+
+  @Override
+  public boolean isLoaded() { return true; }
+
+  @Override
+  public String getName() { return ""; }
+
+  @Override
+  public String getUniqueName() { return ""; }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
new file mode 100644
index 0000000..5fcd398
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton class used to maintain the versions of all the catalog objects stored in a
+ * local catalog cache. Simple wrapper around a priority queue which stores the catalog
+ * object versions, allowing O(1) retrieval of the minimum object version currently
+ * stored in the cache. Provides a simple API to add, remove and update catalog object
+ * versions. Not thread-safe.
+ *
+ * The primary use case of this class is to allow an Impalad catalog cache determine when
+ * the result set of an INVALIDATE METADATA operation has been applied locally by keeping
+ * track of the minimum catalog object version.
+ */
+public class CatalogObjectVersionQueue {
+  private final PriorityQueue<Long> objectVersions_ = new PriorityQueue<>();
+
+  public static final CatalogObjectVersionQueue INSTANCE =
+      new CatalogObjectVersionQueue();
+
+  private CatalogObjectVersionQueue() {}
+
+  public void updateVersions(long oldVersion, long newVersion) {
+    removeVersion(oldVersion);
+    addVersion(newVersion);
+  }
+
+  public void removeVersion(long oldVersion) {
+    objectVersions_.remove(oldVersion);
+  }
+
+  public void addVersion(long newVersion) {
+    objectVersions_.add(newVersion);
+  }
+
+  public long getMinimumVersion() {
+    Long minVersion = objectVersions_.peek();
+    return minVersion != null ? minVersion : 0;
+  }
+
+  public void addAll(List<? extends CatalogObject> catalogObjects) {
+    for (CatalogObject catalogObject: catalogObjects) {
+      addVersion(catalogObject.getCatalogVersion());
+    }
+  }
+
+  public void removeAll(List<? extends CatalogObject> catalogObjects) {
+    for (CatalogObject catalogObject: catalogObjects) {
+      removeVersion(catalogObject.getCatalogVersion());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d2a0a82..f75b0a8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.Path;
@@ -45,9 +47,9 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.TopicUpdateLog.Entry;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -58,8 +60,9 @@ import org.apache.impala.hive.executor.UdfExecutor;
 import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TCatalogUpdateResult;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
@@ -73,6 +76,8 @@ import org.apache.thrift.protocol.TCompactProtocol;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -80,14 +85,62 @@ import com.google.common.collect.Sets;
 /**
  * Specialized Catalog that implements the CatalogService specific Catalog
  * APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
- * and processing of DDL requests. For each DDL request, the CatalogServiceCatalog
- * will return the catalog version that the update will show up in. The client
- * can then wait until the statestore sends an update that contains that catalog
- * version.
- * The CatalogServiceCatalog also manages a global "catalog version". The version
- * is incremented and assigned to a CatalogObject whenever it is
- * added/modified/removed from the catalog. This means each CatalogObject will have a
- * unique version and assigned versions are strictly increasing.
+ * and processing of DDL requests. The CatalogServiceCatalog maintains a global
+ * "catalog version". The version is incremented and assigned to a CatalogObject whenever
+ * it is added/modified/removed from the catalog. This means each CatalogObject will have
+ * a unique version and assigned versions are strictly increasing.
+ *
+ * Periodically, the CatalogServiceCatalog collects a delta of catalog updates (based on a
+ * specified catalog version) and constructs a topic update to be sent to the statestore.
+ * Each catalog topic update is defined by a range of catalog versions (from, to] and the
+ * CatalogServiceCatalog guarantees that every catalog object that has a version in the
+ * specified range is included in the catalog topic update. Concurrent DDL requests are
+ * allowed while a topic update is in progress. Hence, there is a non-zero probability
+ * that frequently modified catalog objects may keep skipping topic updates. That can
+ * happen when by the time a topic update thread tries to collect an object update, that
+ * object is being modified by another metadata operation, causing its version to surpass
+ * the 'to' version of the topic update. To ensure that all catalog updates
+ * are eventually included in a catalog topic update, we keep track of the number of times
+ * each catalog object has skipped a topic update and if that number exceeds a specified
+ * threshold, we add the catalog object to the next topic update even if its version is
+ * higher than the 'to' version of the topic update. As a result, the same version of an
+ * object might be sent in two subsequent topic updates.
+ *
+ * The CatalogServiceCatalog maintains two logs:
+ * - Delete log. Since deleted objects are removed from the cache, the cache itself is
+ *   not useful for tracking deletions. This log is used for populating the list of
+ *   deleted objects during a topic update by recording the catalog objects that
+ *   have been removed from the catalog. An entry with a new version is added to this log
+ *   every time an object is removed (e.g. dropTable). Incrementing an object's version
+ *   and adding it to the delete log should be performed atomically. An entry is removed
+ *   from this log by the topic update thread when the associated deletion entry is
+ *   added to a topic update.
+ * - Topic update log. This log records information about the catalog objects that have
+ *   been included in a catalog topic update. Only the thread that is processing the
+ *   topic update is responsible for adding, updating, and removing entries from the log.
+ *   All other operations (e.g. addTable) only read topic update log entries but never
+ *   modify them. Each entry includes the number of times a catalog object has
+ *   skipped a topic update, which version of the object was last sent in a topic update
+ *   and what was the version of that topic update. Entries of the topic update log are
+ *   garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
+ *   update processing thread to prevent the log from growing indefinitely. Metadata
+ *   operations using SYNC_DDL are inspecting this log to identify the catalog topic
+ *   version that the issuing impalad must wait for in order to ensure that the effects
+ *   of this operation have been broadcast to all the coordinators.
+ *
+ * Known anomalies with SYNC_DDL:
+ *   The time-based cleanup process of the topic update log entries may cause metadata
+ *   operations that use SYNC_DDL to hang while waiting for specific topic update log
+ *   entries. That could happen if the thread processing the metadata operation stalls
+ *   for a long period of time (longer than the time to process
+ *   TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
+ *   applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
+ *   the probability of such an event, we set the value of the
+ *   TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
+ *   from hanging in that path due to unknown issues (e.g. bugs), operations using
+ *   SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
+ *   exception is thrown if the specified max wait time is exceeded. See
+ *   waitForSyncDdlVersion() for more details.
  *
  * Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
  * background by the TableLoadingMgr; tables can be prioritized for loading by calling
@@ -100,7 +153,7 @@ import com.google.common.collect.Sets;
  * out-of-band of the table loading thread pool.
  *
  * See the class comments in CatalogOpExecutor for a description of the locking protocol
- * that should be employed if both the catalog lock and table locks need to be held at
+ * that should be employed if both the version lock and table locks need to be held at
  * the same time.
  *
  * TODO: Consider removing on-demand loading and have everything go through the table
@@ -110,6 +163,7 @@ public class CatalogServiceCatalog extends Catalog {
   private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
 
   private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
+  private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
   private final TUniqueId catalogServiceId_;
 
   // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
@@ -123,11 +177,11 @@ public class CatalogServiceCatalog extends Catalog {
   //   from the metastore.
   // * During renameTable(), because a table must be removed and added to the catalog
   //   atomically (potentially in a different database).
-  private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
+  private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true);
 
   // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
   // with each update to the Catalog. Continued across the lifetime of the object.
-  // Protected by catalogLock_.
+  // Protected by versionLock_.
   // TODO: Handle overflow of catalogVersion_ and nextTableId_.
   // TODO: The name of this variable is misleading and can be interpreted as a property
   // of the catalog server. Rename into something that indicates its role as a global
@@ -150,6 +204,19 @@ public class CatalogServiceCatalog extends Catalog {
   // Local temporary directory to copy UDF Jars.
   private static String localLibraryPath_;
 
+  // Log of deleted catalog objects.
+  private final CatalogDeltaLog deleteLog_;
+
+  // Version of the last topic update returned to the statestore.
+  // The version of a topic update is the catalog version of the CATALOG object
+  // that is added to it.
+  private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1);
+
+  // Wait time for a topic update.
+  private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000;
+
+  private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
+
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -169,7 +236,7 @@ public class CatalogServiceCatalog extends Catalog {
       // local, etc.)
       if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
         cachePoolReader_.scheduleAtFixedRate(
-            new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
+            new CachePoolReader(false), 0, 1, TimeUnit.MINUTES);
       }
     } catch (IOException e) {
       LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
@@ -180,6 +247,7 @@ public class CatalogServiceCatalog extends Catalog {
       sentryProxy_ = null;
     }
     localLibraryPath_ = new String("file://" + localLibraryPath);
+    deleteLog_ = new CatalogDeltaLog();
   }
 
   // Timeout for acquiring a table lock
@@ -189,7 +257,7 @@ public class CatalogServiceCatalog extends Catalog {
   private static final int TBL_LOCK_RETRY_MS = 10;
 
   /**
-   * Tries to acquire catalogLock_ and the lock of 'tbl' in that order. Returns true if it
+   * Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
    * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
    * when the function returns. Returns false otherwise and no lock is held in this case.
    */
@@ -197,7 +265,7 @@ public class CatalogServiceCatalog extends Catalog {
     long begin = System.currentTimeMillis();
     long end;
     do {
-      catalogLock_.writeLock().lock();
+      versionLock_.writeLock().lock();
       if (tbl.getLock().tryLock()) {
         if (LOG.isTraceEnabled()) {
           end = System.currentTimeMillis();
@@ -206,7 +274,7 @@ public class CatalogServiceCatalog extends Catalog {
         }
         return true;
       }
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
       try {
         // Sleep to avoid spinning and allow other operations to make progress.
         Thread.sleep(TBL_LOCK_RETRY_MS);
@@ -223,12 +291,17 @@ public class CatalogServiceCatalog extends Catalog {
    * Called periodically by the cachePoolReader_.
    */
   protected class CachePoolReader implements Runnable {
-
+    // If true, existing cache pools will get a new catalog version and, consequently,
+    // they will be added to the next topic update, triggering an update in each
+    // coordinator's local catalog cache. This is needed for the case of INVALIDATE
+    // METADATA where a new catalog version needs to be assigned to every catalog object.
+    private final boolean incrementVersions_;
     /**
      * This constructor is needed to create a non-threaded execution of the class.
      */
-    public CachePoolReader() {
+    public CachePoolReader(boolean incrementVersions) {
       super();
+      incrementVersions_ = incrementVersions;
     }
 
     public void run() {
@@ -249,28 +322,45 @@ public class CatalogServiceCatalog extends Catalog {
         return;
       }
 
-      catalogLock_.writeLock().lock();
+      versionLock_.writeLock().lock();
       try {
         // Determine what has changed relative to what we have cached.
         Set<String> droppedCachePoolNames = Sets.difference(
             hdfsCachePools_.keySet(), currentCachePools.keySet());
         Set<String> createdCachePoolNames = Sets.difference(
             currentCachePools.keySet(), hdfsCachePools_.keySet());
+        Set<String> survivingCachePoolNames = Sets.difference(
+            hdfsCachePools_.keySet(), droppedCachePoolNames);
         // Add all new cache pools.
         for (String createdCachePool: createdCachePoolNames) {
           HdfsCachePool cachePool = new HdfsCachePool(
               currentCachePools.get(createdCachePool));
-          cachePool.setCatalogVersion(
-              CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
+          cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
           hdfsCachePools_.add(cachePool);
         }
         // Remove dropped cache pools.
         for (String cachePoolName: droppedCachePoolNames) {
-          hdfsCachePools_.remove(cachePoolName);
-          CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
+          HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
+          if (cachePool != null) {
+            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+            TCatalogObject removedObject =
+                new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+                    cachePool.getCatalogVersion());
+            removedObject.setCache_pool(cachePool.toThrift());
+            deleteLog_.addRemovedObject(removedObject);
+          }
+        }
+        if (incrementVersions_) {
+          // Increment the version of existing pools in order to be added to the next
+          // topic update.
+          for (String survivingCachePoolName: survivingCachePoolNames) {
+            HdfsCachePool cachePool = hdfsCachePools_.get(survivingCachePoolName);
+            Preconditions.checkNotNull(cachePool);
+            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+          }
         }
       } finally {
-        catalogLock_.writeLock().unlock();
+        versionLock_.writeLock().unlock();
       }
     }
   }
@@ -297,120 +387,347 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Returns all known objects in the Catalog (Tables, Views, Databases, and
-   * Functions). Some metadata may be skipped for objects that have a catalog
-   * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
-   * update contains a consistent snapshot of all items in the catalog. While holding the
-   * catalog lock, it locks each accessed table to protect against concurrent
-   * modifications.
+   * Identifies and returns the catalog objects that were added/modified/deleted in the
+   * catalog with versions > 'fromVersion'. It operates on a snaphsot of the catalog
+   * without holding the catalog lock which means that other concurrent metadata
+   * operations can still make progress while the catalog delta is computed. An entry in
+   * the topic update log is added for every catalog object that is included in the
+   * catalog delta. The log is examined by operations using SYNC_DDL to determine which
+   * topic update covers the result set of metadata operation. Once the catalog delta is
+   * computed, the entries in the delete log with versions less than 'fromVersion' are
+   * garbage collected.
+   */
+  public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
+    // Maximum catalog version (inclusive) to be included in the catalog delta.
+    long toVersion = getCatalogVersion();
+    TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
+    resp.setUpdated_objects(new ArrayList<TCatalogObject>());
+    resp.setDeleted_objects(new ArrayList<TCatalogObject>());
+    resp.setMax_catalog_version(toVersion);
+
+    for (Db db: getAllDbs()) {
+      addDatabaseToCatalogDelta(db, fromVersion, toVersion, resp);
+    }
+    for (DataSource dataSource: getAllDataSources()) {
+      addDataSourceToCatalogDelta(dataSource, fromVersion, toVersion, resp);
+    }
+    for (HdfsCachePool cachePool: getAllHdfsCachePools()) {
+      addHdfsCachePoolToCatalogDelta(cachePool, fromVersion, toVersion, resp);
+    }
+    for (Role role: getAllRoles()) {
+      addRoleToCatalogDelta(role, fromVersion, toVersion, resp);
+    }
+    Set<String> updatedCatalogObjects = Sets.newHashSet();
+    for (TCatalogObject catalogObj: resp.updated_objects) {
+      topicUpdateLog_.add(Catalog.toCatalogObjectKey(catalogObj),
+          new TopicUpdateLog.Entry(0, catalogObj.getCatalog_version(),
+              toVersion));
+      updatedCatalogObjects.add(Catalog.toCatalogObjectKey(catalogObj));
+    }
+
+    // Identify the catalog objects that were removed from the catalog for which their
+    // versions are in range ('fromVersion', 'toVersion']. We need to make sure
+    // that we don't include "deleted" objects that were re-added to the catalog.
+    for (TCatalogObject removedObject: getDeletedObjects(fromVersion, toVersion)) {
+      if (!updatedCatalogObjects.contains(
+          Catalog.toCatalogObjectKey(removedObject))) {
+        topicUpdateLog_.add(Catalog.toCatalogObjectKey(removedObject),
+            new TopicUpdateLog.Entry(0, removedObject.getCatalog_version(),
+                toVersion));
+        resp.addToDeleted_objects(removedObject);
+      }
+    }
+    // Each topic update should contain a single "TCatalog" object which is used to
+    // pass overall state on the catalog, such as the current version and the
+    // catalog service id. By setting the catalog version to the latest catalog
+    // version at this point, it ensures impalads will always bump their versions,
+    // even in the case where an object has been dropped.
+    TCatalogObject catalog =
+        new TCatalogObject(TCatalogObjectType.CATALOG, toVersion);
+    catalog.setCatalog(new TCatalog(catalogServiceId_));
+    resp.addToUpdated_objects(catalog);
+    // Garbage collect the delete and topic update log.
+    deleteLog_.garbageCollect(toVersion);
+    topicUpdateLog_.garbageCollectUpdateLogEntries(toVersion);
+    lastSentTopicUpdate_.set(toVersion);
+    // Notify any operation that is waiting on the next topic update.
+    synchronized (topicUpdateLog_) {
+      topicUpdateLog_.notifyAll();
+    }
+    return resp;
+  }
+
+  /**
+   * Get a snapshot view of all the catalog objects that were deleted between versions
+   * ('fromVersion', 'toVersion'].
    */
-  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
-    TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
-    resp.setObjects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
-    catalogLock_.readLock().lock();
+  private List<TCatalogObject> getDeletedObjects(long fromVersion, long toVersion) {
+    versionLock_.readLock().lock();
     try {
-      for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
-        TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
-            db.getCatalogVersion());
-        catalogDb.setDb(db.toThrift());
-        resp.addToObjects(catalogDb);
-
-        for (String tblName: db.getAllTableNames()) {
-          TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
-              Catalog.INITIAL_CATALOG_VERSION);
-
-          Table tbl = db.getTable(tblName);
-          if (tbl == null) {
-            LOG.error("Table: " + tblName + " was expected to be in the catalog " +
-                "cache. Skipping table for this update.");
-            continue;
-          }
+      return deleteLog_.retrieveObjects(fromVersion, toVersion);
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
 
-          // Protect the table from concurrent modifications.
-          tbl.getLock().lock();
-          try {
-            // Only add the extended metadata if this table's version is >=
-            // the fromVersion.
-            if (tbl.getCatalogVersion() >= fromVersion) {
-              try {
-                catalogTbl.setTable(tbl.toThrift());
-              } catch (Exception e) {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
-                      db.getName(), tblName, e.getMessage()), e);
-                }
-                continue;
-              }
-              catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-            } else {
-              catalogTbl.setTable(new TTable(db.getName(), tblName));
-            }
-          } finally {
-            tbl.getLock().unlock();
-          }
-          resp.addToObjects(catalogTbl);
-        }
+  /**
+   * Get a snapshot view of all the databases in the catalog.
+   */
+  private List<Db> getAllDbs() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(dbCache_.get().values());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
 
-        for (Function fn: db.getFunctions(null, new PatternMatcher())) {
-          TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
-              fn.getCatalogVersion());
-          function.setFn(fn.toThrift());
-          resp.addToObjects(function);
-        }
+  /**
+   * Get a snapshot view of all the data sources in the catalog.
+   */
+   private List<DataSource> getAllDataSources() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(getDataSources());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the Hdfs cache pools in the catalog.
+   */
+  private List<HdfsCachePool> getAllHdfsCachePools() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(hdfsCachePools_);
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the roles in the catalog.
+   */
+  private List<Role> getAllRoles() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(authPolicy_.getAllRoles());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Adds a database in the topic update if its version is in the range
+   * ('fromVersion', 'toVersion']. It iterates through all the tables and functions of
+   * this database to determine if they can be included in the topic update.
+   */
+  private void addDatabaseToCatalogDelta(Db db, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    long dbVersion = db.getCatalogVersion();
+    if (dbVersion > fromVersion && dbVersion <= toVersion) {
+      TCatalogObject catalogDb =
+          new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion);
+      catalogDb.setDb(db.toThrift());
+      resp.addToUpdated_objects(catalogDb);
+    }
+    for (Table tbl: getAllTables(db)) {
+      addTableToCatalogDelta(tbl, fromVersion, toVersion, resp);
+    }
+    for (Function fn: getAllFunctions(db)) {
+      addFunctionToCatalogDelta(fn, fromVersion, toVersion, resp);
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the tables in a database.
+   */
+  private List<Table> getAllTables(Db db) {
+    Preconditions.checkNotNull(db);
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(db.getTables());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the functions in a database.
+   */
+  private List<Function> getAllFunctions(Db db) {
+    Preconditions.checkNotNull(db);
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(db.getFunctions(null, new PatternMatcher()));
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Adds a table in the topic update if its version is in the range
+   * ('fromVersion', 'toVersion']. If the table's version is larger than 'toVersion' and
+   * the table has skipped a topic update 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is
+   * included in the topic update. This prevents tables that are updated frequently from
+   * skipping topic updates indefinitely, which would also violate the semantics of
+   * SYNC_DDL.
+   */
+  private void addTableToCatalogDelta(Table tbl, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    if (tbl.getCatalogVersion() <= toVersion) {
+      addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+    } else {
+      TopicUpdateLog.Entry topicUpdateEntry =
+          topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+      Preconditions.checkNotNull(topicUpdateEntry);
+      if (topicUpdateEntry.getNumSkippedTopicUpdates() >= MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+        addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+      } else {
+        LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+            toVersion);
+        topicUpdateLog_.add(tbl.getUniqueName(),
+            new TopicUpdateLog.Entry(
+                topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+                topicUpdateEntry.getLastSentVersion(),
+                topicUpdateEntry.getLastSentCatalogUpdate()));
       }
+    }
+  }
 
-      for (DataSource dataSource: getDataSources()) {
-        TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
-            dataSource.getCatalogVersion());
-        catalogObj.setData_source(dataSource.toThrift());
-        resp.addToObjects(catalogObj);
+  /**
+   * Helper function that tries to add a table in a topic update. It acquires table's
+   * lock and checks if its version is in the ('fromVersion', 'toVersion'] range and how
+   * many consecutive times (if any) has the table skipped a topic update.
+   */
+  private void addTableToCatalogDeltaHelper(Table tbl, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    TCatalogObject catalogTbl =
+        new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
+    tbl.getLock().lock();
+    try {
+      long tblVersion = tbl.getCatalogVersion();
+      if (tblVersion <= fromVersion) return;
+      TopicUpdateLog.Entry topicUpdateEntry =
+          topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+      if (tblVersion > toVersion &&
+          topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+        LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+            toVersion);
+        topicUpdateLog_.add(tbl.getUniqueName(),
+            new TopicUpdateLog.Entry(
+                topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+                topicUpdateEntry.getLastSentVersion(),
+                topicUpdateEntry.getLastSentCatalogUpdate()));
+        return;
       }
-      for (HdfsCachePool cachePool: hdfsCachePools_) {
-        TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-            cachePool.getCatalogVersion());
-        pool.setCache_pool(cachePool.toThrift());
-        resp.addToObjects(pool);
+      try {
+        catalogTbl.setTable(tbl.toThrift());
+      } catch (Exception e) {
+        LOG.error(String.format("Error calling toThrift() on table %s: %s",
+            tbl.getFullName(), e.getMessage()), e);
+        return;
       }
+      catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+      resp.addToUpdated_objects(catalogTbl);
+    } finally {
+      tbl.getLock().unlock();
+    }
+  }
 
-      // Get all roles
-      for (Role role: authPolicy_.getAllRoles()) {
-        TCatalogObject thriftRole = new TCatalogObject();
-        thriftRole.setRole(role.toThrift());
-        thriftRole.setCatalog_version(role.getCatalogVersion());
-        thriftRole.setType(role.getCatalogObjectType());
-        resp.addToObjects(thriftRole);
-
-        for (RolePrivilege p: role.getPrivileges()) {
-          TCatalogObject privilege = new TCatalogObject();
-          privilege.setPrivilege(p.toThrift());
-          privilege.setCatalog_version(p.getCatalogVersion());
-          privilege.setType(p.getCatalogObjectType());
-          resp.addToObjects(privilege);
-        }
-      }
+  /**
+   * Adds a function to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addFunctionToCatalogDelta(Function fn, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    long fnVersion = fn.getCatalogVersion();
+    if (fnVersion <= fromVersion || fnVersion > toVersion) return;
+    TCatalogObject function =
+        new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion);
+    function.setFn(fn.toThrift());
+    resp.addToUpdated_objects(function);
+  }
 
-      // Each update should contain a single "TCatalog" object which is used to
-      // pass overall state on the catalog, such as the current version and the
-      // catalog service id.
-      TCatalogObject catalog = new TCatalogObject();
-      catalog.setType(TCatalogObjectType.CATALOG);
-      // By setting the catalog version to the latest catalog version at this point,
-      // it ensure impalads will always bump their versions, even in the case where
-      // an object has been dropped.
-      catalog.setCatalog_version(getCatalogVersion());
-      catalog.setCatalog(new TCatalog(catalogServiceId_));
-      resp.addToObjects(catalog);
-
-      // The max version is the max catalog version of all items in the update.
-      resp.setMax_catalog_version(getCatalogVersion());
-      return resp;
+  /**
+   * Adds a data source to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addDataSourceToCatalogDelta(DataSource dataSource, long fromVersion,
+      long toVersion, TGetCatalogDeltaResponse resp) {
+    long dsVersion = dataSource.getCatalogVersion();
+    if (dsVersion <= fromVersion || dsVersion > toVersion) return;
+    TCatalogObject catalogObj =
+        new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion);
+    catalogObj.setData_source(dataSource.toThrift());
+    resp.addToUpdated_objects(catalogObj);
+  }
+
+  /**
+   * Adds a HDFS cache pool to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool, long fromVersion,
+      long toVersion, TGetCatalogDeltaResponse resp) {
+    long cpVersion = cachePool.getCatalogVersion();
+    if (cpVersion <= fromVersion || cpVersion > toVersion) {
+      return;
+    }
+    TCatalogObject pool =
+        new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion);
+    pool.setCache_pool(cachePool.toThrift());
+    resp.addToUpdated_objects(pool);
+  }
+
+
+  /**
+   * Adds a role to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion']. It iterates through all the privileges of this role to
+   * determine if they can be inserted in the topic update.
+   */
+  private void addRoleToCatalogDelta(Role role, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    long roleVersion = role.getCatalogVersion();
+    if (roleVersion > fromVersion && roleVersion <= toVersion) {
+      TCatalogObject thriftRole =
+          new TCatalogObject(TCatalogObjectType.ROLE, roleVersion);
+      thriftRole.setRole(role.toThrift());
+      resp.addToUpdated_objects(thriftRole);
+    }
+    for (RolePrivilege p: getAllPrivileges(role)) {
+      addRolePrivilegeToCatalogDelta(p, fromVersion, toVersion, resp);
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the privileges in a role.
+   */
+  private List<RolePrivilege> getAllPrivileges(Role role) {
+    Preconditions.checkNotNull(role);
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(role.getPrivileges());
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
   }
 
   /**
+   * Adds a role privilege to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addRolePrivilegeToCatalogDelta(RolePrivilege priv, long fromVersion,
+      long toVersion, TGetCatalogDeltaResponse resp) {
+    long privVersion = priv.getCatalogVersion();
+    if (privVersion <= fromVersion || privVersion > toVersion) return;
+    TCatalogObject privilege =
+        new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion);
+    privilege.setPrivilege(priv.toThrift());
+    resp.addToUpdated_objects(privilege);
+  }
+
+  /**
    * Returns all user defined functions (aggregate and scalar) in the specified database.
    * Functions are not returned in a defined order.
    */
@@ -710,6 +1027,31 @@ public class CatalogServiceCatalog extends Catalog {
           tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
         }
       }
+
+      if (existingDb != null) {
+        // Identify any removed functions and add them to the delta log.
+        for (Map.Entry<String, List<Function>> e:
+             existingDb.getAllFunctions().entrySet()) {
+          for (Function fn: e.getValue()) {
+            if (newDb.getFunction(fn,
+                Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
+              fn.setCatalogVersion(incrementAndGetCatalogVersion());
+              deleteLog_.addRemovedObject(fn.toTCatalogObject());
+            }
+          }
+        }
+
+        // Identify any deleted tables and add them to the delta log
+        Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
+        Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
+        oldTableNames.removeAll(newTableNames);
+        for (String removedTableName: oldTableNames) {
+          Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
+              removedTableName);
+          removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+          deleteLog_.addRemovedObject(removedTable.toTCatalogObject());
+        }
+      }
       return Pair.create(newDb, tblsToBackgroundLoad);
     } catch (Exception e) {
       LOG.warn("Encountered an exception while invalidating database: " + dbName +
@@ -720,22 +1062,35 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Resets this catalog instance by clearing all cached table and database metadata.
+   * Returns the current catalog version before reset has taken any effect. The
+   * requesting impalad will use that version to determine when the
+   * effects of reset have been applied to its local catalog cache.
    */
-  public void reset() throws CatalogException {
-    LOG.info("Invalidating all metadata.");
-
+  public long reset() throws CatalogException {
+    long currentCatalogVersion = getCatalogVersion();
+    LOG.info("Invalidating all metadata. Version: " + currentCatalogVersion);
     // First update the policy metadata.
     if (sentryProxy_ != null) {
       // Sentry Service is enabled.
       try {
         // Update the authorization policy, waiting for the result to complete.
-        sentryProxy_.refresh();
+        sentryProxy_.refresh(true);
       } catch (Exception e) {
         throw new CatalogException("Error updating authorization policy: ", e);
       }
     }
 
-    catalogLock_.writeLock().lock();
+    // Update the HDFS cache pools
+    CachePoolReader reader = new CachePoolReader(true);
+    reader.run();
+
+    versionLock_.writeLock().lock();
+    // Assign new versions to all the loaded data sources.
+    for (DataSource dataSource: getDataSources()) {
+      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+    }
+
+    // Update db and table metadata
     try {
       // Not all Java UDFs are persisted to the metastore. The ones which aren't
       // should be restored once the catalog has been invalidated.
@@ -757,6 +1112,16 @@ public class CatalogServiceCatalog extends Catalog {
         }
       }
       dbCache_.set(newDbCache);
+
+      // Identify any deleted databases and add them to the delta log.
+      Set<String> oldDbNames = oldDbCache.keySet();
+      Set<String> newDbNames = newDbCache.keySet();
+      oldDbNames.removeAll(newDbNames);
+      for (String dbName: oldDbNames) {
+        Db removedDb = oldDbCache.get(dbName);
+        updateDeleteLog(removedDb);
+      }
+
       // Submit tables for background loading.
       for (TTableName tblName: tblsToBackgroundLoad) {
         tableLoadingMgr_.backgroundLoad(tblName);
@@ -765,21 +1130,26 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.error(e);
       throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
     LOG.info("Invalidated all metadata.");
+    return currentCatalogVersion;
   }
 
   /**
    * Adds a database name to the metadata cache and returns the database's
    * new Db object. Used by CREATE DATABASE statements.
    */
-  public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb)
-      throws ImpalaException {
+  public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
     Db newDb = new Db(dbName, this, msDb);
-    newDb.setCatalogVersion(incrementAndGetCatalogVersion());
-    addDb(newDb);
-    return newDb;
+    versionLock_.writeLock().lock();
+    try {
+      newDb.setCatalogVersion(incrementAndGetCatalogVersion());
+      addDb(newDb);
+      return newDb;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
   }
 
   /**
@@ -789,11 +1159,36 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public Db removeDb(String dbName) {
-    Db removedDb = super.removeDb(dbName);
-    if (removedDb != null) {
-      removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      Db removedDb = super.removeDb(dbName);
+      if (removedDb != null) updateDeleteLog(removedDb);
+      return removedDb;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Helper function to clean up the state associated with a removed database. It creates
+   * the entries in the delete log for 'db' as well as for its tables and functions
+   * (if any).
+   */
+  private void updateDeleteLog(Db db) {
+    Preconditions.checkNotNull(db);
+    Preconditions.checkState(versionLock_.isWriteLockedByCurrentThread());
+    if (!db.isSystemDb()) {
+      for (Table tbl: db.getTables()) {
+        tbl.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(tbl.toMinimalTCatalogObject());
+      }
+      for (Function fn: db.getFunctions(null, new PatternMatcher())) {
+        fn.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(fn.toTCatalogObject());
+      }
     }
-    return removedDb;
+    db.setCatalogVersion(incrementAndGetCatalogVersion());
+    deleteLog_.addRemovedObject(db.toTCatalogObject());
   }
 
   /**
@@ -804,8 +1199,13 @@ public class CatalogServiceCatalog extends Catalog {
     Db db = getDb(dbName);
     if (db == null) return null;
     Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
-    incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    db.addTable(incompleteTable);
+    versionLock_.writeLock().lock();
+    try {
+      incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+      db.addTable(incompleteTable);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
     return db.getTable(tblName);
   }
 
@@ -825,14 +1225,14 @@ public class CatalogServiceCatalog extends Catalog {
 
     long previousCatalogVersion;
     // Return the table if it is already loaded or submit a new load request.
-    catalogLock_.readLock().lock();
+    versionLock_.readLock().lock();
     try {
       Table tbl = getTable(dbName, tblName);
       if (tbl == null || tbl.isLoaded()) return tbl;
       previousCatalogVersion = tbl.getCatalogVersion();
       loadReq = tableLoadingMgr_.loadAsync(tableName);
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
     Preconditions.checkNotNull(loadReq);
     try {
@@ -850,7 +1250,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
       throws DatabaseNotFoundException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Db db = getDb(updatedTbl.getDb().getName());
       if (db == null) {
@@ -868,7 +1268,7 @@ public class CatalogServiceCatalog extends Catalog {
       db.addTable(updatedTbl);
       return updatedTbl;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -879,12 +1279,17 @@ public class CatalogServiceCatalog extends Catalog {
   public Table removeTable(String dbName, String tblName) {
     Db parentDb = getDb(dbName);
     if (parentDb == null) return null;
-
-    Table removedTable = parentDb.removeTable(tblName);
-    if (removedTable != null) {
-      removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      Table removedTable = parentDb.removeTable(tblName);
+      if (removedTable != null) {
+        removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(removedTable.toMinimalTCatalogObject());
+      }
+      return removedTable;
+    } finally {
+      versionLock_.writeLock().unlock();
     }
-    return removedTable;
   }
 
   /**
@@ -894,11 +1299,17 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public Function removeFunction(Function desc) {
-    Function removedFn = super.removeFunction(desc);
-    if (removedFn != null) {
-      removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      Function removedFn = super.removeFunction(desc);
+      if (removedFn != null) {
+        removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(removedFn.toTCatalogObject());
+      }
+      return removedFn;
+    } finally {
+      versionLock_.writeLock().unlock();
     }
-    return removedFn;
   }
 
   /**
@@ -909,9 +1320,14 @@ public class CatalogServiceCatalog extends Catalog {
   public boolean addFunction(Function fn) {
     Db db = getDb(fn.getFunctionName().getDb());
     if (db == null) return false;
-    if (db.addFunction(fn)) {
-      fn.setCatalogVersion(incrementAndGetCatalogVersion());
-      return true;
+    versionLock_.writeLock().lock();
+    try {
+      if (db.addFunction(fn)) {
+        fn.setCatalogVersion(incrementAndGetCatalogVersion());
+        return true;
+      }
+    } finally {
+      versionLock_.writeLock().unlock();
     }
     return false;
   }
@@ -922,20 +1338,31 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public boolean addDataSource(DataSource dataSource) {
-    if (dataSources_.add(dataSource)) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
-      return true;
+    versionLock_.writeLock().lock();
+    try {
+      if (dataSources_.add(dataSource)) {
+        dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+        return true;
+      }
+    } finally {
+      versionLock_.writeLock().unlock();
     }
     return false;
   }
 
   @Override
   public DataSource removeDataSource(String dataSourceName) {
-    DataSource dataSource = dataSources_.remove(dataSourceName);
-    if (dataSource != null) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      DataSource dataSource = dataSources_.remove(dataSourceName);
+      if (dataSource != null) {
+        dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(dataSource.toTCatalogObject());
+      }
+      return dataSource;
+    } finally {
+      versionLock_.writeLock().unlock();
     }
-    return dataSource;
   }
 
   /**
@@ -969,20 +1396,30 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Renames a table. Equivalent to an atomic drop + add of the table. Returns
-   * the new Table object with an incremented catalog version or null if the
-   * drop or add were unsuccessful. If null is returned, then the catalog cache
-   * is in one of the following two states:
-   * 1. Old table was not removed, and new table was not added
-   * 2. Old table was removed, but new table was not added
+   * a pair of tables containing the removed table (or null if the table drop was not
+   * successful) and the new table (or null if either the drop of the old one or the
+   * add of the new table was not successful). Depending on the return value, the catalog
+   * cache is in one of the following states:
+   * 1. null, null: Old table was not removed and new table was not added.
+   * 2. null, T_new: Invalid configuration
+   * 3. T_old, null: Old table was removed but new table was not added.
+   * 4. T_old, T_new: Old table was removed and new table was added.
    */
-  public Table renameTable(TTableName oldTableName, TTableName newTableName)
+  public Pair<Table, Table> renameTable(TTableName oldTableName, TTableName newTableName)
       throws CatalogException {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
     if (db == null) return null;
-    Table oldTable = db.removeTable(oldTableName.getTable_name());
-    if (oldTable == null) return null;
-    return addTable(newTableName.getDb_name(), newTableName.getTable_name());
+    versionLock_.writeLock().lock();
+    try {
+      Table oldTable =
+          removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
+      if (oldTable == null) return Pair.create(null, null);
+      return Pair.create(oldTable,
+          addTable(newTableName.getDb_name(), newTableName.getTable_name()));
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
   }
 
   /**
@@ -1004,7 +1441,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
       try (MetaStoreClient msClient = getMetaStoreClient()) {
         org.apache.hadoop.hive.metastore.api.Table msTbl = null;
         try {
@@ -1019,7 +1456,7 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
       return tbl.toTCatalogObject();
     } finally {
-      Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+      Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
   }
@@ -1123,9 +1560,7 @@ public class CatalogServiceCatalog extends Catalog {
         try {
           msDb = msClient.getHiveClient().getDatabase(dbName);
           Preconditions.checkNotNull(msDb);
-          db = new Db(dbName, this, msDb);
-          db.setCatalogVersion(incrementAndGetCatalogVersion());
-          addDb(db);
+          addDb(dbName, msDb);
           dbWasAdded.setRef(true);
         } catch (TException e) {
           // The Metastore database cannot be get. Log the error and return.
@@ -1138,9 +1573,8 @@ public class CatalogServiceCatalog extends Catalog {
     // Add a new uninitialized table to the table cache, effectively invalidating
     // any existing entry. The metadata for the table will be loaded lazily, on the
     // on the next access to the table.
-    Table newTable = IncompleteTable.createUninitializedTable(db, tblName);
-    newTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    db.addTable(newTable);
+    Table newTable = addTable(dbName, tblName);
+    Preconditions.checkNotNull(newTable);
     if (loadInBackground_) {
       tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
           tblName.toLowerCase()));
@@ -1148,7 +1582,10 @@ public class CatalogServiceCatalog extends Catalog {
     if (dbWasAdded.getRef()) {
       // The database should always have a lower catalog version than the table because
       // it needs to be created before the table can be added.
-      Preconditions.checkState(db.getCatalogVersion() < newTable.getCatalogVersion());
+      Db addedDb = newTable.getDb();
+      Preconditions.checkNotNull(addedDb);
+      Preconditions.checkState(
+          addedDb.getCatalogVersion() < newTable.getCatalogVersion());
     }
     return newTable.toTCatalogObject();
   }
@@ -1158,14 +1595,14 @@ public class CatalogServiceCatalog extends Catalog {
    * If a role with the same name already exists it will be overwritten.
    */
   public Role addRole(String roleName, Set<String> grantGroups) {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = new Role(roleName, grantGroups);
       role.setCatalogVersion(incrementAndGetCatalogVersion());
       authPolicy_.addRole(role);
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1175,14 +1612,19 @@ public class CatalogServiceCatalog extends Catalog {
    * exists.
    */
   public Role removeRole(String roleName) {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.removeRole(roleName);
       if (role == null) return null;
+      for (RolePrivilege priv: role.getPrivileges()) {
+        priv.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(priv.toTCatalogObject());
+      }
       role.setCatalogVersion(incrementAndGetCatalogVersion());
+      deleteLog_.addRemovedObject(role.toTCatalogObject());
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1192,14 +1634,14 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public Role addRoleGrantGroup(String roleName, String groupName)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.addGrantGroup(roleName, groupName);
       Preconditions.checkNotNull(role);
       role.setCatalogVersion(incrementAndGetCatalogVersion());
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1209,14 +1651,14 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public Role removeRoleGrantGroup(String roleName, String groupName)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.removeGrantGroup(roleName, groupName);
       Preconditions.checkNotNull(role);
       role.setCatalogVersion(incrementAndGetCatalogVersion());
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1227,7 +1669,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.getRole(roleName);
       if (role == null) throw new CatalogException("Role does not exist: " + roleName);
@@ -1236,7 +1678,7 @@ public class CatalogServiceCatalog extends Catalog {
       authPolicy_.addPrivilege(priv);
       return priv;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1247,7 +1689,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.getRole(roleName);
       if (role == null) throw new CatalogException("Role does not exist: " + roleName);
@@ -1255,9 +1697,10 @@ public class CatalogServiceCatalog extends Catalog {
           role.removePrivilege(thriftPriv.getPrivilege_name());
       if (rolePrivilege == null) return null;
       rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
+      deleteLog_.addRemovedObject(rolePrivilege.toTCatalogObject());
       return rolePrivilege;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1268,13 +1711,13 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)
       throws CatalogException {
-    catalogLock_.readLock().lock();
+    versionLock_.readLock().lock();
     try {
       Role role = authPolicy_.getRole(roleName);
       if (role == null) throw new CatalogException("Role does not exist: " + roleName);
       return role.getPrivilege(privSpec.getPrivilege_name());
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
   }
 
@@ -1282,11 +1725,11 @@ public class CatalogServiceCatalog extends Catalog {
    * Increments the current Catalog version and returns the new value.
    */
   public long incrementAndGetCatalogVersion() {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       return ++catalogVersion_;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1294,16 +1737,15 @@ public class CatalogServiceCatalog extends Catalog {
    * Returns the current Catalog version.
    */
   public long getCatalogVersion() {
-    catalogLock_.readLock().lock();
+    versionLock_.readLock().lock();
     try {
       return catalogVersion_;
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
   }
 
-  public ReentrantReadWriteLock getLock() { return catalogLock_; }
-
+  public ReentrantReadWriteLock getLock() { return versionLock_; }
   public SentryProxy getSentryProxy() { return sentryProxy_; }
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
 
@@ -1320,7 +1762,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
       HdfsTable hdfsTable = (HdfsTable) tbl;
       HdfsPartition hdfsPartition = hdfsTable
           .getPartitionFromThriftPartitionSpec(partitionSpec);
@@ -1355,8 +1797,111 @@ public class CatalogServiceCatalog extends Catalog {
           hdfsTable.getFullName(), partitionName));
       return hdfsTable.toTCatalogObject();
     } finally {
-      Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+      Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
   }
+
+  public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
+
+  /**
+   * Returns the version of the topic update that an operation using SYNC_DDL must wait
+   * for in order to ensure that its result set ('result') has been broadcast to all the
+   * coordinators. For operations that don't produce a result set, e.g. INVALIDATE
+   * METADATA, return the version specified in 'result.version'.
+   */
+  public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
+    if (!result.isSetUpdated_catalog_objects() &&
+        !result.isSetRemoved_catalog_objects()) {
+      return result.getVersion();
+    }
+    long lastSentTopicUpdate = lastSentTopicUpdate_.get();
+    // Maximum number of attempts (topic updates) to find the catalog topic version that
+    // an operation using SYNC_DDL must wait for.
+    long maxNumAttempts = 5;
+    if (result.isSetUpdated_catalog_objects()) {
+      maxNumAttempts =
+          result.getUpdated_catalog_objects().size() * (MAX_NUM_SKIPPED_TOPIC_UPDATES + 1);
+    }
+    long numAttempts = 0;
+    long begin = System.currentTimeMillis();
+    long versionToWaitFor = -1;
+    while (versionToWaitFor == -1) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
+      }
+      // Examine the topic update log to determine the latest topic update that
+      // covers the added/modified/deleted objects in 'result'.
+      long topicVersionForUpdates =
+          getCoveringTopicUpdateVersion(result.getUpdated_catalog_objects());
+      long topicVersionForDeletes =
+          getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects());
+      if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
+        // Wait for the next topic update.
+        synchronized(topicUpdateLog_) {
+          try {
+            topicUpdateLog_.wait(TOPIC_UPDATE_WAIT_TIMEOUT_MS);
+          } catch (InterruptedException e) {
+            // Ignore
+          }
+        }
+        long currentTopicUpdate = lastSentTopicUpdate_.get();
+        // Don't count time-based exits from the wait() toward the maxNumAttempts
+        // threshold.
+        if (lastSentTopicUpdate != currentTopicUpdate) {
+          ++numAttempts;
+          if (numAttempts > maxNumAttempts) {
+            throw new CatalogException("Couldn't retrieve the catalog topic version " +
+                "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
+                "The operation has been successfully executed but its effects may have " +
+                "not been broadcast to all the coordinators.");
+          }
+          lastSentTopicUpdate = currentTopicUpdate;
+        }
+      } else {
+        versionToWaitFor = Math.max(topicVersionForDeletes, topicVersionForUpdates);
+      }
+    }
+    Preconditions.checkState(versionToWaitFor >= 0);
+    LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " +
+        versionToWaitFor + ". Time to identify topic version (msec): " +
+        (System.currentTimeMillis() - begin));
+    return versionToWaitFor;
+  }
+
+  /**
+   * Returns the version of the topic update that covers a set of TCatalogObjects.
+   * A topic update U covers a TCatalogObject T, corresponding to a catalog object O,
+   * if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >=
+   * last_topic_update(O). The first condition indicates that a version of O that is
+   * larger or equal to the version in T has been added to a topic update. The second
+   * condition indicates that U is either the update to include O or an update following
+   * the one to include O. Returns -1 if there is a catalog object in 'tCatalogObjects'
+   * which doesn't satisfy the above conditions.
+   */
+  private long getCoveringTopicUpdateVersion(List<TCatalogObject> tCatalogObjects) {
+    if (tCatalogObjects == null || tCatalogObjects.isEmpty()) {
+      return lastSentTopicUpdate_.get();
+    }
+    long versionToWaitFor = -1;
+    for (TCatalogObject tCatalogObject: tCatalogObjects) {
+      TopicUpdateLog.Entry topicUpdateEntry =
+          topicUpdateLog_.get(Catalog.toCatalogObjectKey(tCatalogObject));
+      // There are two reasons for which a topic update log entry cannot be found:
+      // a) It corresponds to a new catalog object that hasn't been processed by a catalog
+      // update yet.
+      // b) It corresponds to a catalog object that hasn't been modified for at least
+      // TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
+      // collected.
+      // In both cases, -1 is returned to indicate that we're waiting for the
+      // entry to show up in the topic update log.
+      if (topicUpdateEntry == null ||
+          topicUpdateEntry.getLastSentVersion() < tCatalogObject.getCatalog_version()) {
+        return -1;
+      }
+      versionToWaitFor =
+          Math.max(versionToWaitFor, topicUpdateEntry.getLastSentCatalogUpdate());
+    }
+    return versionToWaitFor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/DataSource.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSource.java b/fe/src/main/java/org/apache/impala/catalog/DataSource.java
index e9601d7..f59f3be 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSource.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSource.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog;
 
 import org.apache.hadoop.fs.Path;
 
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDataSource;
 import com.google.common.base.Objects;
@@ -27,13 +28,12 @@ import com.google.common.base.Objects;
  * Represents a data source in the catalog. Contains the data source name and all
  * information needed to locate and load the data source.
  */
-public class DataSource implements CatalogObject {
+public class DataSource extends CatalogObjectImpl {
   private final String dataSrcName_;
   private final String className_;
   private final String apiVersionString_;
   // Qualified path to the data source.
   private final String location_;
-  private long catalogVersion_ =  Catalog.INITIAL_CATALOG_VERSION;
 
   public DataSource(String dataSrcName, String location, String className,
       String apiVersionString) {
@@ -54,16 +54,9 @@ public class DataSource implements CatalogObject {
   }
 
   @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
-  @Override
   public String getName() { return dataSrcName_; }
-
   @Override
-  public boolean isLoaded() { return true; }
+  public String getUniqueName() { return "DATA_SOURCE:" + dataSrcName_.toLowerCase(); }
 
   public String getLocation() { return location_; }
   public String getClassName() { return className_; }
@@ -85,4 +78,11 @@ public class DataSource implements CatalogObject {
   public static String debugString(TDataSource thrift) {
     return fromThrift(thrift).debugString();
   }
+
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObj =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObj.setData_source(toThrift());
+    return catalogObj;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 074ff92..f1c9c8e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -34,6 +34,7 @@ import org.apache.impala.catalog.Function;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunction;
@@ -59,11 +60,10 @@ import com.google.common.collect.Maps;
  * value is the base64 representation of the thrift serialized function object.
  *
  */
-public class Db implements CatalogObject {
+public class Db extends CatalogObjectImpl {
   private static final Logger LOG = LoggerFactory.getLogger(Db.class);
   private final Catalog parentCatalog_;
   private final TDatabase thriftDb_;
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
 
   public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
 
@@ -134,16 +134,14 @@ public class Db implements CatalogObject {
   @Override
   public String getName() { return thriftDb_.getDb_name(); }
   @Override
-  public TCatalogObjectType getCatalogObjectType() {
-    return TCatalogObjectType.DATABASE;
-  }
+  public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.DATABASE; }
+  @Override
+  public String getUniqueName() { return "DATABASE:" + getName().toLowerCase(); }
 
   /**
    * Adds a table to the table cache.
    */
-  public void addTable(Table table) {
-    tableCache_.add(table);
-  }
+  public void addTable(Table table) { tableCache_.add(table); }
 
   /**
    * Gets all table names in the table cache.
@@ -165,9 +163,7 @@ public class Db implements CatalogObject {
    * Returns the Table with the given name if present in the table cache or null if the
    * table does not exist in the cache.
    */
-  public Table getTable(String tblName) {
-    return tableCache_.get(tblName);
-  }
+  public Table getTable(String tblName) { return tableCache_.get(tblName); }
 
   /**
    * Removes the table name and any cached metadata from the Table cache.
@@ -495,11 +491,10 @@ public class Db implements CatalogObject {
     return result;
   }
 
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
-  @Override
-  public boolean isLoaded() { return true; }
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObj =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObj.setDb(toThrift());
+    return catalogObj;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Function.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index 80316a6..03cd867 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -49,7 +49,7 @@ import com.google.common.collect.Lists;
  * - Builtin functions, which are recreated after every restart of the
  *   catalog. (persisted, visible to Impala)
  */
-public class Function implements CatalogObject {
+public class Function extends CatalogObjectImpl {
   // Enum for how to compare function signatures.
   // For decimal types, the type in the function can be a wildcard, i.e. decimal(*,*).
   // The wildcard can *only* exist as function type, the caller will always be a
@@ -106,7 +106,6 @@ public class Function implements CatalogObject {
   // Set to true for functions that survive service restarts, including all builtins,
   // native and IR functions, but only Java functions created without a signature.
   private boolean isPersistent_;
-  private long catalogVersion_ =  Catalog.INITIAL_CATALOG_VERSION;
 
   public Function(FunctionName name, Type[] argTypes,
       Type retType, boolean varArgs) {
@@ -298,15 +297,12 @@ public class Function implements CatalogObject {
 
   @Override
   public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.FUNCTION; }
-
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
   @Override
   public String getName() { return getFunctionName().toString(); }
+  @Override
+  public String getUniqueName() {
+    return "FUNCTION:" + name_.toString() + "(" + signatureString() + ")";
+  }
 
   // Child classes must override this function.
   public String toSql(boolean ifNotExists) { return ""; }
@@ -315,7 +311,7 @@ public class Function implements CatalogObject {
     TCatalogObject result = new TCatalogObject();
     result.setType(TCatalogObjectType.FUNCTION);
     result.setFn(toThrift());
-    result.setCatalog_version(catalogVersion_);
+    result.setCatalog_version(getCatalogVersion());
     return result;
   }
 
@@ -372,9 +368,6 @@ public class Function implements CatalogObject {
     return function;
   }
 
-  @Override
-  public boolean isLoaded() { return true; }
-
   // Returns the resolved symbol in the binary. The BE will do a lookup of 'symbol'
   // in the binary and try to resolve unmangled names.
   // If this function is expecting a return argument, retArgType is that type. It should

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
index 398bc87..6f752d4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
@@ -28,8 +28,7 @@ import com.google.common.base.Preconditions;
  * care about for cache pools is the cache pool name. In the future it may be desirable
  * to track additional metadata such as the owner, size, and current usage of the pool.
  */
-public class HdfsCachePool implements CatalogObject {
-  private long catalogVersion_;
+public class HdfsCachePool extends CatalogObjectImpl {
   private final THdfsCachePool cachePool_;
 
   public HdfsCachePool(CachePoolInfo cachePoolInfo) {
@@ -57,9 +56,5 @@ public class HdfsCachePool implements CatalogObject {
   @Override
   public String getName() { return cachePool_.getPool_name(); }
   @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-  @Override
-  public boolean isLoaded() { return true; }
-}
\ No newline at end of file
+  public String getUniqueName() { return "HDFS_CACHE_POOL:" + getName().toLowerCase(); }
+}