You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/16 23:04:21 UTC

[3/4] impala git commit: IMPALA-5058: Improve the concurrency of DDL/DML operations

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index 27839b3..1a5e2ec 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -24,24 +25,35 @@ import java.util.TreeMap;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TTable;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
- * The impalad catalog cache can be modified by either a state store update or by a
- * direct ("fast") update that applies the result of a catalog operation to the cache
- * out-of-band of a state store update. This thread safe log tracks the divergence
- * (due to direct updates to the cache) of this impalad's cache from the last state
- * store update. This log is needed to ensure work is never undone. For example,
- * consider the following sequence of events:
- * t1: [Direct Update] - Add item A - (Catalog Version 9)
- * t2: [Direct Update] - Drop item A - (Catalog Version 10)
- * t3: [StateStore Update] - (From Catalog Version 9)
- * This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ * Represents a log of deleted catalog objects.
  *
- * Currently this only tracks objects that were dropped, since the catalog cache can be
- * queried to check if an object was added. TODO: Also track object additions from async
- * operations. This could be used to to "replay" the log in the case of a catalog reset
- * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
- * "invalidate metadata" is run concurrently with async catalog operations.
+ * There are currently two use cases for this log:
+ *
+ * a) Processing catalog updates in the impalads
+ *   The impalad catalog cache can be modified by either a state store update or by a
+ *   direct update that applies the result of a catalog operation to the cache
+ *   out-of-band of a state store update. This thread safe log tracks the divergence
+ *   (due to direct updates to the cache) of this impalad's cache from the last state
+ *   store update. This log is needed to ensure work is never undone. For example,
+ *   consider the following sequence of events:
+ *   t1: [Direct Update] - Add item A - (Catalog Version 9)
+ *   t2: [Direct Update] - Drop item A - (Catalog Version 10)
+ *   t3: [StateStore Update] - (From Catalog Version 9)
+ *   This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ *   Currently this only tracks objects that were dropped, since the catalog cache can be
+ *   queried to check if an object was added. TODO: Also track object additions from async
+ *   operations. This could be used to to "replay" the log in the case of a catalog reset
+ *   ("invalidate metadata"). Currently, the catalog may briefly go back in time if
+ *   "invalidate metadata" is run concurrently with async catalog operations.
+ *
+ * b) Building catalog topic updates in the catalogd
+ *   The catalogd uses this log to identify deleted catalog objects that have been deleted
+ *   since the last catalog topic update. Once the catalog topic update is constructed,
+ *   the old entries in the log are garbage collected to prevent the log from growing
+ *   indefinitely.
  */
 public class CatalogDeltaLog {
   // Map of the catalog version an object was removed from the catalog
@@ -58,6 +70,17 @@ public class CatalogDeltaLog {
   }
 
   /**
+   * Retrieve all the removed catalog objects with versions in range
+   * (fromVersion, toVersion].
+   */
+  public synchronized List<TCatalogObject> retrieveObjects(long fromVersion,
+      long toVersion) {
+    SortedMap<Long, TCatalogObject> objects =
+        removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1);
+    return ImmutableList.<TCatalogObject>copyOf(objects.values());
+  }
+
+  /**
    * Given the current catalog version, removes all items with catalogVersion <
    * currectCatalogVersion. Such objects do not need to be tracked in the delta
    * log anymore because they are consistent with the state store's view of the
@@ -85,36 +108,8 @@ public class CatalogDeltaLog {
     SortedMap<Long, TCatalogObject> candidateObjects =
         removedCatalogObjects_.tailMap(catalogObject.getCatalog_version());
     for (Map.Entry<Long, TCatalogObject> entry: candidateObjects.entrySet()) {
-      if (objectNamesMatch(catalogObject, entry.getValue())) return true;
+      if (Catalog.keyEquals(catalogObject, entry.getValue())) return true;
     }
     return false;
   }
-
-  /**
-   * Returns true if the two objects have the same object type and name.
-   * TODO: Use global object IDs everywhere instead of tracking catalog objects by name.
-   */
-  private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
-    if (first.getType() != second.getType()) return false;
-    switch (first.getType()) {
-      case DATABASE:
-        return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name());
-      case TABLE:
-      case VIEW:
-        TTable firstTbl = first.getTable();
-        return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) &&
-            firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name());
-      case FUNCTION:
-        return first.getFn().getSignature().equals(second.getFn().getSignature()) &&
-            first.getFn().getName().equals(second.getFn().getName());
-      case ROLE:
-        return first.getRole().getRole_name().equalsIgnoreCase(
-            second.getRole().getRole_name());
-      case PRIVILEGE:
-        return first.getPrivilege().getPrivilege_name().equalsIgnoreCase(
-            second.getPrivilege().getPrivilege_name()) &&
-            first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id();
-      default: return false;
-    }
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index a2d8ca9..cc4c495 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -29,6 +29,9 @@ public interface CatalogObject {
   // Returns the unqualified object name.
   public String getName();
 
+  // Returns the unique name of this catalog object.
+  public String getUniqueName();
+
   // Returns the version of this catalog object.
   public long getCatalogVersion();
 
@@ -37,4 +40,4 @@ public interface CatalogObject {
 
   // Returns true if this CatalogObject has had its metadata loaded, false otherwise.
   public boolean isLoaded();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
index c578e41..d882cdb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectCache.java
@@ -30,6 +30,8 @@ import com.google.common.collect.Lists;
 /**
  * Thread safe cache for storing CatalogObjects. Enforces that updates to existing
  * entries only get applied if the new/updated object has a larger catalog version.
+ * add() and remove() functions also update the entries of the global instance of
+ * CatalogObjectVersionQueue which keeps track of the catalog objects versions.
  */
 public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T> {
   private final boolean caseInsensitiveKeys_;
@@ -71,13 +73,19 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
     String key = catalogObject.getName();
     if (caseInsensitiveKeys_) key = key.toLowerCase();
     T existingItem = metadataCache_.putIfAbsent(key, catalogObject);
-    if (existingItem == null) return true;
+    if (existingItem == null) {
+      CatalogObjectVersionQueue.INSTANCE.addVersion(
+          catalogObject.getCatalogVersion());
+      return true;
+    }
 
     if (existingItem.getCatalogVersion() < catalogObject.getCatalogVersion()) {
       // When existingItem != null it indicates there was already an existing entry
       // associated with the key. Add the updated object iff it has a catalog
       // version greater than the existing entry.
       metadataCache_.put(key, catalogObject);
+      CatalogObjectVersionQueue.INSTANCE.updateVersions(
+          existingItem.getCatalogVersion(), catalogObject.getCatalogVersion());
       return true;
     }
     return false;
@@ -89,7 +97,12 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
    */
   public synchronized T remove(String name) {
     if (caseInsensitiveKeys_) name = name.toLowerCase();
-    return metadataCache_.remove(name);
+    T removedObject = metadataCache_.remove(name);
+    if (removedObject != null) {
+      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+          removedObject.getCatalogVersion());
+    }
+    return removedObject;
   }
 
   /**
@@ -144,4 +157,4 @@ public class CatalogObjectCache<T extends CatalogObject> implements Iterable<T>
   public Iterator<T> iterator() {
     return metadataCache_.values().iterator();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
new file mode 100644
index 0000000..321355c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectImpl.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.impala.common.NotImplementedException;
+import org.apache.impala.thrift.TCatalogObjectType;
+
+abstract public class CatalogObjectImpl implements CatalogObject {
+  // Current catalog version of this object. Initialized to
+  // Catalog.INITIAL_CATALOG_VERSION.
+  private AtomicLong catalogVersion_ = new AtomicLong(Catalog.INITIAL_CATALOG_VERSION);
+
+  protected CatalogObjectImpl() {}
+
+  @Override
+  public long getCatalogVersion() { return catalogVersion_.get(); }
+
+  @Override
+  public void setCatalogVersion(long newVersion) { catalogVersion_.set(newVersion); }
+
+  @Override
+  public boolean isLoaded() { return true; }
+
+  @Override
+  public String getName() { return ""; }
+
+  @Override
+  public String getUniqueName() { return ""; }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
new file mode 100644
index 0000000..5fcd398
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObjectVersionQueue.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Singleton class used to maintain the versions of all the catalog objects stored in a
+ * local catalog cache. Simple wrapper around a priority queue which stores the catalog
+ * object versions, allowing O(1) retrieval of the minimum object version currently
+ * stored in the cache. Provides a simple API to add, remove and update catalog object
+ * versions. Not thread-safe.
+ *
+ * The primary use case of this class is to allow an Impalad catalog cache determine when
+ * the result set of an INVALIDATE METADATA operation has been applied locally by keeping
+ * track of the minimum catalog object version.
+ */
+public class CatalogObjectVersionQueue {
+  private final PriorityQueue<Long> objectVersions_ = new PriorityQueue<>();
+
+  public static final CatalogObjectVersionQueue INSTANCE =
+      new CatalogObjectVersionQueue();
+
+  private CatalogObjectVersionQueue() {}
+
+  public void updateVersions(long oldVersion, long newVersion) {
+    removeVersion(oldVersion);
+    addVersion(newVersion);
+  }
+
+  public void removeVersion(long oldVersion) {
+    objectVersions_.remove(oldVersion);
+  }
+
+  public void addVersion(long newVersion) {
+    objectVersions_.add(newVersion);
+  }
+
+  public long getMinimumVersion() {
+    Long minVersion = objectVersions_.peek();
+    return minVersion != null ? minVersion : 0;
+  }
+
+  public void addAll(List<? extends CatalogObject> catalogObjects) {
+    for (CatalogObject catalogObject: catalogObjects) {
+      addVersion(catalogObject.getCatalogVersion());
+    }
+  }
+
+  public void removeAll(List<? extends CatalogObject> catalogObjects) {
+    for (CatalogObject catalogObject: catalogObjects) {
+      removeVersion(catalogObject.getCatalogVersion());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d2a0a82..f75b0a8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.Path;
@@ -45,9 +47,9 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.catalog.TopicUpdateLog.Entry;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -58,8 +60,9 @@ import org.apache.impala.hive.executor.UdfExecutor;
 import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TCatalogUpdateResult;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
@@ -73,6 +76,8 @@ import org.apache.thrift.protocol.TCompactProtocol;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -80,14 +85,62 @@ import com.google.common.collect.Sets;
 /**
  * Specialized Catalog that implements the CatalogService specific Catalog
  * APIs. The CatalogServiceCatalog manages loading of all the catalog metadata
- * and processing of DDL requests. For each DDL request, the CatalogServiceCatalog
- * will return the catalog version that the update will show up in. The client
- * can then wait until the statestore sends an update that contains that catalog
- * version.
- * The CatalogServiceCatalog also manages a global "catalog version". The version
- * is incremented and assigned to a CatalogObject whenever it is
- * added/modified/removed from the catalog. This means each CatalogObject will have a
- * unique version and assigned versions are strictly increasing.
+ * and processing of DDL requests. The CatalogServiceCatalog maintains a global
+ * "catalog version". The version is incremented and assigned to a CatalogObject whenever
+ * it is added/modified/removed from the catalog. This means each CatalogObject will have
+ * a unique version and assigned versions are strictly increasing.
+ *
+ * Periodically, the CatalogServiceCatalog collects a delta of catalog updates (based on a
+ * specified catalog version) and constructs a topic update to be sent to the statestore.
+ * Each catalog topic update is defined by a range of catalog versions (from, to] and the
+ * CatalogServiceCatalog guarantees that every catalog object that has a version in the
+ * specified range is included in the catalog topic update. Concurrent DDL requests are
+ * allowed while a topic update is in progress. Hence, there is a non-zero probability
+ * that frequently modified catalog objects may keep skipping topic updates. That can
+ * happen when by the time a topic update thread tries to collect an object update, that
+ * object is being modified by another metadata operation, causing its version to surpass
+ * the 'to' version of the topic update. To ensure that all catalog updates
+ * are eventually included in a catalog topic update, we keep track of the number of times
+ * each catalog object has skipped a topic update and if that number exceeds a specified
+ * threshold, we add the catalog object to the next topic update even if its version is
+ * higher than the 'to' version of the topic update. As a result, the same version of an
+ * object might be sent in two subsequent topic updates.
+ *
+ * The CatalogServiceCatalog maintains two logs:
+ * - Delete log. Since deleted objects are removed from the cache, the cache itself is
+ *   not useful for tracking deletions. This log is used for populating the list of
+ *   deleted objects during a topic update by recording the catalog objects that
+ *   have been removed from the catalog. An entry with a new version is added to this log
+ *   every time an object is removed (e.g. dropTable). Incrementing an object's version
+ *   and adding it to the delete log should be performed atomically. An entry is removed
+ *   from this log by the topic update thread when the associated deletion entry is
+ *   added to a topic update.
+ * - Topic update log. This log records information about the catalog objects that have
+ *   been included in a catalog topic update. Only the thread that is processing the
+ *   topic update is responsible for adding, updating, and removing entries from the log.
+ *   All other operations (e.g. addTable) only read topic update log entries but never
+ *   modify them. Each entry includes the number of times a catalog object has
+ *   skipped a topic update, which version of the object was last sent in a topic update
+ *   and what was the version of that topic update. Entries of the topic update log are
+ *   garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
+ *   update processing thread to prevent the log from growing indefinitely. Metadata
+ *   operations using SYNC_DDL are inspecting this log to identify the catalog topic
+ *   version that the issuing impalad must wait for in order to ensure that the effects
+ *   of this operation have been broadcast to all the coordinators.
+ *
+ * Known anomalies with SYNC_DDL:
+ *   The time-based cleanup process of the topic update log entries may cause metadata
+ *   operations that use SYNC_DDL to hang while waiting for specific topic update log
+ *   entries. That could happen if the thread processing the metadata operation stalls
+ *   for a long period of time (longer than the time to process
+ *   TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
+ *   applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
+ *   the probability of such an event, we set the value of the
+ *   TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
+ *   from hanging in that path due to unknown issues (e.g. bugs), operations using
+ *   SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
+ *   exception is thrown if the specified max wait time is exceeded. See
+ *   waitForSyncDdlVersion() for more details.
  *
  * Table metadata for IncompleteTables (not fully loaded tables) are loaded in the
  * background by the TableLoadingMgr; tables can be prioritized for loading by calling
@@ -100,7 +153,7 @@ import com.google.common.collect.Sets;
  * out-of-band of the table loading thread pool.
  *
  * See the class comments in CatalogOpExecutor for a description of the locking protocol
- * that should be employed if both the catalog lock and table locks need to be held at
+ * that should be employed if both the version lock and table locks need to be held at
  * the same time.
  *
  * TODO: Consider removing on-demand loading and have everything go through the table
@@ -110,6 +163,7 @@ public class CatalogServiceCatalog extends Catalog {
   private static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
 
   private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
+  private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
   private final TUniqueId catalogServiceId_;
 
   // Fair lock used to synchronize reads/writes of catalogVersion_. Because this lock
@@ -123,11 +177,11 @@ public class CatalogServiceCatalog extends Catalog {
   //   from the metastore.
   // * During renameTable(), because a table must be removed and added to the catalog
   //   atomically (potentially in a different database).
-  private final ReentrantReadWriteLock catalogLock_ = new ReentrantReadWriteLock(true);
+  private final ReentrantReadWriteLock versionLock_ = new ReentrantReadWriteLock(true);
 
   // Last assigned catalog version. Starts at INITIAL_CATALOG_VERSION and is incremented
   // with each update to the Catalog. Continued across the lifetime of the object.
-  // Protected by catalogLock_.
+  // Protected by versionLock_.
   // TODO: Handle overflow of catalogVersion_ and nextTableId_.
   // TODO: The name of this variable is misleading and can be interpreted as a property
   // of the catalog server. Rename into something that indicates its role as a global
@@ -150,6 +204,19 @@ public class CatalogServiceCatalog extends Catalog {
   // Local temporary directory to copy UDF Jars.
   private static String localLibraryPath_;
 
+  // Log of deleted catalog objects.
+  private final CatalogDeltaLog deleteLog_;
+
+  // Version of the last topic update returned to the statestore.
+  // The version of a topic update is the catalog version of the CATALOG object
+  // that is added to it.
+  private final AtomicLong lastSentTopicUpdate_ = new AtomicLong(-1);
+
+  // Wait time for a topic update.
+  private static final long TOPIC_UPDATE_WAIT_TIMEOUT_MS = 10000;
+
+  private final TopicUpdateLog topicUpdateLog_ = new TopicUpdateLog();
+
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -169,7 +236,7 @@ public class CatalogServiceCatalog extends Catalog {
       // local, etc.)
       if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
         cachePoolReader_.scheduleAtFixedRate(
-            new CachePoolReader(), 0, 1, TimeUnit.MINUTES);
+            new CachePoolReader(false), 0, 1, TimeUnit.MINUTES);
       }
     } catch (IOException e) {
       LOG.error("Couldn't identify the default FS. Cache Pool reader will be disabled.");
@@ -180,6 +247,7 @@ public class CatalogServiceCatalog extends Catalog {
       sentryProxy_ = null;
     }
     localLibraryPath_ = new String("file://" + localLibraryPath);
+    deleteLog_ = new CatalogDeltaLog();
   }
 
   // Timeout for acquiring a table lock
@@ -189,7 +257,7 @@ public class CatalogServiceCatalog extends Catalog {
   private static final int TBL_LOCK_RETRY_MS = 10;
 
   /**
-   * Tries to acquire catalogLock_ and the lock of 'tbl' in that order. Returns true if it
+   * Tries to acquire versionLock_ and the lock of 'tbl' in that order. Returns true if it
    * successfully acquires both within TBL_LOCK_TIMEOUT_MS millisecs; both locks are held
    * when the function returns. Returns false otherwise and no lock is held in this case.
    */
@@ -197,7 +265,7 @@ public class CatalogServiceCatalog extends Catalog {
     long begin = System.currentTimeMillis();
     long end;
     do {
-      catalogLock_.writeLock().lock();
+      versionLock_.writeLock().lock();
       if (tbl.getLock().tryLock()) {
         if (LOG.isTraceEnabled()) {
           end = System.currentTimeMillis();
@@ -206,7 +274,7 @@ public class CatalogServiceCatalog extends Catalog {
         }
         return true;
       }
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
       try {
         // Sleep to avoid spinning and allow other operations to make progress.
         Thread.sleep(TBL_LOCK_RETRY_MS);
@@ -223,12 +291,17 @@ public class CatalogServiceCatalog extends Catalog {
    * Called periodically by the cachePoolReader_.
    */
   protected class CachePoolReader implements Runnable {
-
+    // If true, existing cache pools will get a new catalog version and, consequently,
+    // they will be added to the next topic update, triggering an update in each
+    // coordinator's local catalog cache. This is needed for the case of INVALIDATE
+    // METADATA where a new catalog version needs to be assigned to every catalog object.
+    private final boolean incrementVersions_;
     /**
      * This constructor is needed to create a non-threaded execution of the class.
      */
-    public CachePoolReader() {
+    public CachePoolReader(boolean incrementVersions) {
       super();
+      incrementVersions_ = incrementVersions;
     }
 
     public void run() {
@@ -249,28 +322,45 @@ public class CatalogServiceCatalog extends Catalog {
         return;
       }
 
-      catalogLock_.writeLock().lock();
+      versionLock_.writeLock().lock();
       try {
         // Determine what has changed relative to what we have cached.
         Set<String> droppedCachePoolNames = Sets.difference(
             hdfsCachePools_.keySet(), currentCachePools.keySet());
         Set<String> createdCachePoolNames = Sets.difference(
             currentCachePools.keySet(), hdfsCachePools_.keySet());
+        Set<String> survivingCachePoolNames = Sets.difference(
+            hdfsCachePools_.keySet(), droppedCachePoolNames);
         // Add all new cache pools.
         for (String createdCachePool: createdCachePoolNames) {
           HdfsCachePool cachePool = new HdfsCachePool(
               currentCachePools.get(createdCachePool));
-          cachePool.setCatalogVersion(
-              CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
+          cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
           hdfsCachePools_.add(cachePool);
         }
         // Remove dropped cache pools.
         for (String cachePoolName: droppedCachePoolNames) {
-          hdfsCachePools_.remove(cachePoolName);
-          CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
+          HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
+          if (cachePool != null) {
+            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+            TCatalogObject removedObject =
+                new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+                    cachePool.getCatalogVersion());
+            removedObject.setCache_pool(cachePool.toThrift());
+            deleteLog_.addRemovedObject(removedObject);
+          }
+        }
+        if (incrementVersions_) {
+          // Increment the version of existing pools in order to be added to the next
+          // topic update.
+          for (String survivingCachePoolName: survivingCachePoolNames) {
+            HdfsCachePool cachePool = hdfsCachePools_.get(survivingCachePoolName);
+            Preconditions.checkNotNull(cachePool);
+            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+          }
         }
       } finally {
-        catalogLock_.writeLock().unlock();
+        versionLock_.writeLock().unlock();
       }
     }
   }
@@ -297,120 +387,347 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Returns all known objects in the Catalog (Tables, Views, Databases, and
-   * Functions). Some metadata may be skipped for objects that have a catalog
-   * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
-   * update contains a consistent snapshot of all items in the catalog. While holding the
-   * catalog lock, it locks each accessed table to protect against concurrent
-   * modifications.
+   * Identifies and returns the catalog objects that were added/modified/deleted in the
+   * catalog with versions > 'fromVersion'. It operates on a snaphsot of the catalog
+   * without holding the catalog lock which means that other concurrent metadata
+   * operations can still make progress while the catalog delta is computed. An entry in
+   * the topic update log is added for every catalog object that is included in the
+   * catalog delta. The log is examined by operations using SYNC_DDL to determine which
+   * topic update covers the result set of metadata operation. Once the catalog delta is
+   * computed, the entries in the delete log with versions less than 'fromVersion' are
+   * garbage collected.
+   */
+  public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
+    // Maximum catalog version (inclusive) to be included in the catalog delta.
+    long toVersion = getCatalogVersion();
+    TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
+    resp.setUpdated_objects(new ArrayList<TCatalogObject>());
+    resp.setDeleted_objects(new ArrayList<TCatalogObject>());
+    resp.setMax_catalog_version(toVersion);
+
+    for (Db db: getAllDbs()) {
+      addDatabaseToCatalogDelta(db, fromVersion, toVersion, resp);
+    }
+    for (DataSource dataSource: getAllDataSources()) {
+      addDataSourceToCatalogDelta(dataSource, fromVersion, toVersion, resp);
+    }
+    for (HdfsCachePool cachePool: getAllHdfsCachePools()) {
+      addHdfsCachePoolToCatalogDelta(cachePool, fromVersion, toVersion, resp);
+    }
+    for (Role role: getAllRoles()) {
+      addRoleToCatalogDelta(role, fromVersion, toVersion, resp);
+    }
+    Set<String> updatedCatalogObjects = Sets.newHashSet();
+    for (TCatalogObject catalogObj: resp.updated_objects) {
+      topicUpdateLog_.add(Catalog.toCatalogObjectKey(catalogObj),
+          new TopicUpdateLog.Entry(0, catalogObj.getCatalog_version(),
+              toVersion));
+      updatedCatalogObjects.add(Catalog.toCatalogObjectKey(catalogObj));
+    }
+
+    // Identify the catalog objects that were removed from the catalog for which their
+    // versions are in range ('fromVersion', 'toVersion']. We need to make sure
+    // that we don't include "deleted" objects that were re-added to the catalog.
+    for (TCatalogObject removedObject: getDeletedObjects(fromVersion, toVersion)) {
+      if (!updatedCatalogObjects.contains(
+          Catalog.toCatalogObjectKey(removedObject))) {
+        topicUpdateLog_.add(Catalog.toCatalogObjectKey(removedObject),
+            new TopicUpdateLog.Entry(0, removedObject.getCatalog_version(),
+                toVersion));
+        resp.addToDeleted_objects(removedObject);
+      }
+    }
+    // Each topic update should contain a single "TCatalog" object which is used to
+    // pass overall state on the catalog, such as the current version and the
+    // catalog service id. By setting the catalog version to the latest catalog
+    // version at this point, it ensures impalads will always bump their versions,
+    // even in the case where an object has been dropped.
+    TCatalogObject catalog =
+        new TCatalogObject(TCatalogObjectType.CATALOG, toVersion);
+    catalog.setCatalog(new TCatalog(catalogServiceId_));
+    resp.addToUpdated_objects(catalog);
+    // Garbage collect the delete and topic update log.
+    deleteLog_.garbageCollect(toVersion);
+    topicUpdateLog_.garbageCollectUpdateLogEntries(toVersion);
+    lastSentTopicUpdate_.set(toVersion);
+    // Notify any operation that is waiting on the next topic update.
+    synchronized (topicUpdateLog_) {
+      topicUpdateLog_.notifyAll();
+    }
+    return resp;
+  }
+
+  /**
+   * Get a snapshot view of all the catalog objects that were deleted between versions
+   * ('fromVersion', 'toVersion'].
    */
-  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
-    TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
-    resp.setObjects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
-    catalogLock_.readLock().lock();
+  private List<TCatalogObject> getDeletedObjects(long fromVersion, long toVersion) {
+    versionLock_.readLock().lock();
     try {
-      for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
-        TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
-            db.getCatalogVersion());
-        catalogDb.setDb(db.toThrift());
-        resp.addToObjects(catalogDb);
-
-        for (String tblName: db.getAllTableNames()) {
-          TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
-              Catalog.INITIAL_CATALOG_VERSION);
-
-          Table tbl = db.getTable(tblName);
-          if (tbl == null) {
-            LOG.error("Table: " + tblName + " was expected to be in the catalog " +
-                "cache. Skipping table for this update.");
-            continue;
-          }
+      return deleteLog_.retrieveObjects(fromVersion, toVersion);
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
 
-          // Protect the table from concurrent modifications.
-          tbl.getLock().lock();
-          try {
-            // Only add the extended metadata if this table's version is >=
-            // the fromVersion.
-            if (tbl.getCatalogVersion() >= fromVersion) {
-              try {
-                catalogTbl.setTable(tbl.toThrift());
-              } catch (Exception e) {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
-                      db.getName(), tblName, e.getMessage()), e);
-                }
-                continue;
-              }
-              catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-            } else {
-              catalogTbl.setTable(new TTable(db.getName(), tblName));
-            }
-          } finally {
-            tbl.getLock().unlock();
-          }
-          resp.addToObjects(catalogTbl);
-        }
+  /**
+   * Get a snapshot view of all the databases in the catalog.
+   */
+  private List<Db> getAllDbs() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(dbCache_.get().values());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
 
-        for (Function fn: db.getFunctions(null, new PatternMatcher())) {
-          TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
-              fn.getCatalogVersion());
-          function.setFn(fn.toThrift());
-          resp.addToObjects(function);
-        }
+  /**
+   * Get a snapshot view of all the data sources in the catalog.
+   */
+   private List<DataSource> getAllDataSources() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(getDataSources());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the Hdfs cache pools in the catalog.
+   */
+  private List<HdfsCachePool> getAllHdfsCachePools() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(hdfsCachePools_);
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the roles in the catalog.
+   */
+  private List<Role> getAllRoles() {
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(authPolicy_.getAllRoles());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Adds a database in the topic update if its version is in the range
+   * ('fromVersion', 'toVersion']. It iterates through all the tables and functions of
+   * this database to determine if they can be included in the topic update.
+   */
+  private void addDatabaseToCatalogDelta(Db db, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    long dbVersion = db.getCatalogVersion();
+    if (dbVersion > fromVersion && dbVersion <= toVersion) {
+      TCatalogObject catalogDb =
+          new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion);
+      catalogDb.setDb(db.toThrift());
+      resp.addToUpdated_objects(catalogDb);
+    }
+    for (Table tbl: getAllTables(db)) {
+      addTableToCatalogDelta(tbl, fromVersion, toVersion, resp);
+    }
+    for (Function fn: getAllFunctions(db)) {
+      addFunctionToCatalogDelta(fn, fromVersion, toVersion, resp);
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the tables in a database.
+   */
+  private List<Table> getAllTables(Db db) {
+    Preconditions.checkNotNull(db);
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(db.getTables());
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the functions in a database.
+   */
+  private List<Function> getAllFunctions(Db db) {
+    Preconditions.checkNotNull(db);
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(db.getFunctions(null, new PatternMatcher()));
+    } finally {
+      versionLock_.readLock().unlock();
+    }
+  }
+
+  /**
+   * Adds a table in the topic update if its version is in the range
+   * ('fromVersion', 'toVersion']. If the table's version is larger than 'toVersion' and
+   * the table has skipped a topic update 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is
+   * included in the topic update. This prevents tables that are updated frequently from
+   * skipping topic updates indefinitely, which would also violate the semantics of
+   * SYNC_DDL.
+   */
+  private void addTableToCatalogDelta(Table tbl, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    if (tbl.getCatalogVersion() <= toVersion) {
+      addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+    } else {
+      TopicUpdateLog.Entry topicUpdateEntry =
+          topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+      Preconditions.checkNotNull(topicUpdateEntry);
+      if (topicUpdateEntry.getNumSkippedTopicUpdates() >= MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+        addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+      } else {
+        LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+            toVersion);
+        topicUpdateLog_.add(tbl.getUniqueName(),
+            new TopicUpdateLog.Entry(
+                topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+                topicUpdateEntry.getLastSentVersion(),
+                topicUpdateEntry.getLastSentCatalogUpdate()));
       }
+    }
+  }
 
-      for (DataSource dataSource: getDataSources()) {
-        TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
-            dataSource.getCatalogVersion());
-        catalogObj.setData_source(dataSource.toThrift());
-        resp.addToObjects(catalogObj);
+  /**
+   * Helper function that tries to add a table in a topic update. It acquires table's
+   * lock and checks if its version is in the ('fromVersion', 'toVersion'] range and how
+   * many consecutive times (if any) has the table skipped a topic update.
+   */
+  private void addTableToCatalogDeltaHelper(Table tbl, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    TCatalogObject catalogTbl =
+        new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
+    tbl.getLock().lock();
+    try {
+      long tblVersion = tbl.getCatalogVersion();
+      if (tblVersion <= fromVersion) return;
+      TopicUpdateLog.Entry topicUpdateEntry =
+          topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
+      if (tblVersion > toVersion &&
+          topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+        LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
+            toVersion);
+        topicUpdateLog_.add(tbl.getUniqueName(),
+            new TopicUpdateLog.Entry(
+                topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
+                topicUpdateEntry.getLastSentVersion(),
+                topicUpdateEntry.getLastSentCatalogUpdate()));
+        return;
       }
-      for (HdfsCachePool cachePool: hdfsCachePools_) {
-        TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-            cachePool.getCatalogVersion());
-        pool.setCache_pool(cachePool.toThrift());
-        resp.addToObjects(pool);
+      try {
+        catalogTbl.setTable(tbl.toThrift());
+      } catch (Exception e) {
+        LOG.error(String.format("Error calling toThrift() on table %s: %s",
+            tbl.getFullName(), e.getMessage()), e);
+        return;
       }
+      catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+      resp.addToUpdated_objects(catalogTbl);
+    } finally {
+      tbl.getLock().unlock();
+    }
+  }
 
-      // Get all roles
-      for (Role role: authPolicy_.getAllRoles()) {
-        TCatalogObject thriftRole = new TCatalogObject();
-        thriftRole.setRole(role.toThrift());
-        thriftRole.setCatalog_version(role.getCatalogVersion());
-        thriftRole.setType(role.getCatalogObjectType());
-        resp.addToObjects(thriftRole);
-
-        for (RolePrivilege p: role.getPrivileges()) {
-          TCatalogObject privilege = new TCatalogObject();
-          privilege.setPrivilege(p.toThrift());
-          privilege.setCatalog_version(p.getCatalogVersion());
-          privilege.setType(p.getCatalogObjectType());
-          resp.addToObjects(privilege);
-        }
-      }
+  /**
+   * Adds a function to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addFunctionToCatalogDelta(Function fn, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    long fnVersion = fn.getCatalogVersion();
+    if (fnVersion <= fromVersion || fnVersion > toVersion) return;
+    TCatalogObject function =
+        new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion);
+    function.setFn(fn.toThrift());
+    resp.addToUpdated_objects(function);
+  }
 
-      // Each update should contain a single "TCatalog" object which is used to
-      // pass overall state on the catalog, such as the current version and the
-      // catalog service id.
-      TCatalogObject catalog = new TCatalogObject();
-      catalog.setType(TCatalogObjectType.CATALOG);
-      // By setting the catalog version to the latest catalog version at this point,
-      // it ensure impalads will always bump their versions, even in the case where
-      // an object has been dropped.
-      catalog.setCatalog_version(getCatalogVersion());
-      catalog.setCatalog(new TCatalog(catalogServiceId_));
-      resp.addToObjects(catalog);
-
-      // The max version is the max catalog version of all items in the update.
-      resp.setMax_catalog_version(getCatalogVersion());
-      return resp;
+  /**
+   * Adds a data source to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addDataSourceToCatalogDelta(DataSource dataSource, long fromVersion,
+      long toVersion, TGetCatalogDeltaResponse resp) {
+    long dsVersion = dataSource.getCatalogVersion();
+    if (dsVersion <= fromVersion || dsVersion > toVersion) return;
+    TCatalogObject catalogObj =
+        new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion);
+    catalogObj.setData_source(dataSource.toThrift());
+    resp.addToUpdated_objects(catalogObj);
+  }
+
+  /**
+   * Adds a HDFS cache pool to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool, long fromVersion,
+      long toVersion, TGetCatalogDeltaResponse resp) {
+    long cpVersion = cachePool.getCatalogVersion();
+    if (cpVersion <= fromVersion || cpVersion > toVersion) {
+      return;
+    }
+    TCatalogObject pool =
+        new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion);
+    pool.setCache_pool(cachePool.toThrift());
+    resp.addToUpdated_objects(pool);
+  }
+
+
+  /**
+   * Adds a role to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion']. It iterates through all the privileges of this role to
+   * determine if they can be inserted in the topic update.
+   */
+  private void addRoleToCatalogDelta(Role role, long fromVersion, long toVersion,
+      TGetCatalogDeltaResponse resp) {
+    long roleVersion = role.getCatalogVersion();
+    if (roleVersion > fromVersion && roleVersion <= toVersion) {
+      TCatalogObject thriftRole =
+          new TCatalogObject(TCatalogObjectType.ROLE, roleVersion);
+      thriftRole.setRole(role.toThrift());
+      resp.addToUpdated_objects(thriftRole);
+    }
+    for (RolePrivilege p: getAllPrivileges(role)) {
+      addRolePrivilegeToCatalogDelta(p, fromVersion, toVersion, resp);
+    }
+  }
+
+  /**
+   * Get a snapshot view of all the privileges in a role.
+   */
+  private List<RolePrivilege> getAllPrivileges(Role role) {
+    Preconditions.checkNotNull(role);
+    versionLock_.readLock().lock();
+    try {
+      return ImmutableList.copyOf(role.getPrivileges());
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
   }
 
   /**
+   * Adds a role privilege to the topic update if its version is in the range
+   * ('fromVersion', 'toVersion'].
+   */
+  private void addRolePrivilegeToCatalogDelta(RolePrivilege priv, long fromVersion,
+      long toVersion, TGetCatalogDeltaResponse resp) {
+    long privVersion = priv.getCatalogVersion();
+    if (privVersion <= fromVersion || privVersion > toVersion) return;
+    TCatalogObject privilege =
+        new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion);
+    privilege.setPrivilege(priv.toThrift());
+    resp.addToUpdated_objects(privilege);
+  }
+
+  /**
    * Returns all user defined functions (aggregate and scalar) in the specified database.
    * Functions are not returned in a defined order.
    */
@@ -710,6 +1027,31 @@ public class CatalogServiceCatalog extends Catalog {
           tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
         }
       }
+
+      if (existingDb != null) {
+        // Identify any removed functions and add them to the delta log.
+        for (Map.Entry<String, List<Function>> e:
+             existingDb.getAllFunctions().entrySet()) {
+          for (Function fn: e.getValue()) {
+            if (newDb.getFunction(fn,
+                Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
+              fn.setCatalogVersion(incrementAndGetCatalogVersion());
+              deleteLog_.addRemovedObject(fn.toTCatalogObject());
+            }
+          }
+        }
+
+        // Identify any deleted tables and add them to the delta log
+        Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
+        Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
+        oldTableNames.removeAll(newTableNames);
+        for (String removedTableName: oldTableNames) {
+          Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
+              removedTableName);
+          removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+          deleteLog_.addRemovedObject(removedTable.toTCatalogObject());
+        }
+      }
       return Pair.create(newDb, tblsToBackgroundLoad);
     } catch (Exception e) {
       LOG.warn("Encountered an exception while invalidating database: " + dbName +
@@ -720,22 +1062,35 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Resets this catalog instance by clearing all cached table and database metadata.
+   * Returns the current catalog version before reset has taken any effect. The
+   * requesting impalad will use that version to determine when the
+   * effects of reset have been applied to its local catalog cache.
    */
-  public void reset() throws CatalogException {
-    LOG.info("Invalidating all metadata.");
-
+  public long reset() throws CatalogException {
+    long currentCatalogVersion = getCatalogVersion();
+    LOG.info("Invalidating all metadata. Version: " + currentCatalogVersion);
     // First update the policy metadata.
     if (sentryProxy_ != null) {
       // Sentry Service is enabled.
       try {
         // Update the authorization policy, waiting for the result to complete.
-        sentryProxy_.refresh();
+        sentryProxy_.refresh(true);
       } catch (Exception e) {
         throw new CatalogException("Error updating authorization policy: ", e);
       }
     }
 
-    catalogLock_.writeLock().lock();
+    // Update the HDFS cache pools
+    CachePoolReader reader = new CachePoolReader(true);
+    reader.run();
+
+    versionLock_.writeLock().lock();
+    // Assign new versions to all the loaded data sources.
+    for (DataSource dataSource: getDataSources()) {
+      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+    }
+
+    // Update db and table metadata
     try {
       // Not all Java UDFs are persisted to the metastore. The ones which aren't
       // should be restored once the catalog has been invalidated.
@@ -757,6 +1112,16 @@ public class CatalogServiceCatalog extends Catalog {
         }
       }
       dbCache_.set(newDbCache);
+
+      // Identify any deleted databases and add them to the delta log.
+      Set<String> oldDbNames = oldDbCache.keySet();
+      Set<String> newDbNames = newDbCache.keySet();
+      oldDbNames.removeAll(newDbNames);
+      for (String dbName: oldDbNames) {
+        Db removedDb = oldDbCache.get(dbName);
+        updateDeleteLog(removedDb);
+      }
+
       // Submit tables for background loading.
       for (TTableName tblName: tblsToBackgroundLoad) {
         tableLoadingMgr_.backgroundLoad(tblName);
@@ -765,21 +1130,26 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.error(e);
       throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
     LOG.info("Invalidated all metadata.");
+    return currentCatalogVersion;
   }
 
   /**
    * Adds a database name to the metadata cache and returns the database's
    * new Db object. Used by CREATE DATABASE statements.
    */
-  public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb)
-      throws ImpalaException {
+  public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
     Db newDb = new Db(dbName, this, msDb);
-    newDb.setCatalogVersion(incrementAndGetCatalogVersion());
-    addDb(newDb);
-    return newDb;
+    versionLock_.writeLock().lock();
+    try {
+      newDb.setCatalogVersion(incrementAndGetCatalogVersion());
+      addDb(newDb);
+      return newDb;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
   }
 
   /**
@@ -789,11 +1159,36 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public Db removeDb(String dbName) {
-    Db removedDb = super.removeDb(dbName);
-    if (removedDb != null) {
-      removedDb.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      Db removedDb = super.removeDb(dbName);
+      if (removedDb != null) updateDeleteLog(removedDb);
+      return removedDb;
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Helper function to clean up the state associated with a removed database. It creates
+   * the entries in the delete log for 'db' as well as for its tables and functions
+   * (if any).
+   */
+  private void updateDeleteLog(Db db) {
+    Preconditions.checkNotNull(db);
+    Preconditions.checkState(versionLock_.isWriteLockedByCurrentThread());
+    if (!db.isSystemDb()) {
+      for (Table tbl: db.getTables()) {
+        tbl.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(tbl.toMinimalTCatalogObject());
+      }
+      for (Function fn: db.getFunctions(null, new PatternMatcher())) {
+        fn.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(fn.toTCatalogObject());
+      }
     }
-    return removedDb;
+    db.setCatalogVersion(incrementAndGetCatalogVersion());
+    deleteLog_.addRemovedObject(db.toTCatalogObject());
   }
 
   /**
@@ -804,8 +1199,13 @@ public class CatalogServiceCatalog extends Catalog {
     Db db = getDb(dbName);
     if (db == null) return null;
     Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName);
-    incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    db.addTable(incompleteTable);
+    versionLock_.writeLock().lock();
+    try {
+      incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
+      db.addTable(incompleteTable);
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
     return db.getTable(tblName);
   }
 
@@ -825,14 +1225,14 @@ public class CatalogServiceCatalog extends Catalog {
 
     long previousCatalogVersion;
     // Return the table if it is already loaded or submit a new load request.
-    catalogLock_.readLock().lock();
+    versionLock_.readLock().lock();
     try {
       Table tbl = getTable(dbName, tblName);
       if (tbl == null || tbl.isLoaded()) return tbl;
       previousCatalogVersion = tbl.getCatalogVersion();
       loadReq = tableLoadingMgr_.loadAsync(tableName);
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
     Preconditions.checkNotNull(loadReq);
     try {
@@ -850,7 +1250,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   private Table replaceTableIfUnchanged(Table updatedTbl, long expectedCatalogVersion)
       throws DatabaseNotFoundException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Db db = getDb(updatedTbl.getDb().getName());
       if (db == null) {
@@ -868,7 +1268,7 @@ public class CatalogServiceCatalog extends Catalog {
       db.addTable(updatedTbl);
       return updatedTbl;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -879,12 +1279,17 @@ public class CatalogServiceCatalog extends Catalog {
   public Table removeTable(String dbName, String tblName) {
     Db parentDb = getDb(dbName);
     if (parentDb == null) return null;
-
-    Table removedTable = parentDb.removeTable(tblName);
-    if (removedTable != null) {
-      removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      Table removedTable = parentDb.removeTable(tblName);
+      if (removedTable != null) {
+        removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(removedTable.toMinimalTCatalogObject());
+      }
+      return removedTable;
+    } finally {
+      versionLock_.writeLock().unlock();
     }
-    return removedTable;
   }
 
   /**
@@ -894,11 +1299,17 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public Function removeFunction(Function desc) {
-    Function removedFn = super.removeFunction(desc);
-    if (removedFn != null) {
-      removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      Function removedFn = super.removeFunction(desc);
+      if (removedFn != null) {
+        removedFn.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(removedFn.toTCatalogObject());
+      }
+      return removedFn;
+    } finally {
+      versionLock_.writeLock().unlock();
     }
-    return removedFn;
   }
 
   /**
@@ -909,9 +1320,14 @@ public class CatalogServiceCatalog extends Catalog {
   public boolean addFunction(Function fn) {
     Db db = getDb(fn.getFunctionName().getDb());
     if (db == null) return false;
-    if (db.addFunction(fn)) {
-      fn.setCatalogVersion(incrementAndGetCatalogVersion());
-      return true;
+    versionLock_.writeLock().lock();
+    try {
+      if (db.addFunction(fn)) {
+        fn.setCatalogVersion(incrementAndGetCatalogVersion());
+        return true;
+      }
+    } finally {
+      versionLock_.writeLock().unlock();
     }
     return false;
   }
@@ -922,20 +1338,31 @@ public class CatalogServiceCatalog extends Catalog {
    */
   @Override
   public boolean addDataSource(DataSource dataSource) {
-    if (dataSources_.add(dataSource)) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
-      return true;
+    versionLock_.writeLock().lock();
+    try {
+      if (dataSources_.add(dataSource)) {
+        dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+        return true;
+      }
+    } finally {
+      versionLock_.writeLock().unlock();
     }
     return false;
   }
 
   @Override
   public DataSource removeDataSource(String dataSourceName) {
-    DataSource dataSource = dataSources_.remove(dataSourceName);
-    if (dataSource != null) {
-      dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+    versionLock_.writeLock().lock();
+    try {
+      DataSource dataSource = dataSources_.remove(dataSourceName);
+      if (dataSource != null) {
+        dataSource.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(dataSource.toTCatalogObject());
+      }
+      return dataSource;
+    } finally {
+      versionLock_.writeLock().unlock();
     }
-    return dataSource;
   }
 
   /**
@@ -969,20 +1396,30 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Renames a table. Equivalent to an atomic drop + add of the table. Returns
-   * the new Table object with an incremented catalog version or null if the
-   * drop or add were unsuccessful. If null is returned, then the catalog cache
-   * is in one of the following two states:
-   * 1. Old table was not removed, and new table was not added
-   * 2. Old table was removed, but new table was not added
+   * a pair of tables containing the removed table (or null if the table drop was not
+   * successful) and the new table (or null if either the drop of the old one or the
+   * add of the new table was not successful). Depending on the return value, the catalog
+   * cache is in one of the following states:
+   * 1. null, null: Old table was not removed and new table was not added.
+   * 2. null, T_new: Invalid configuration
+   * 3. T_old, null: Old table was removed but new table was not added.
+   * 4. T_old, T_new: Old table was removed and new table was added.
    */
-  public Table renameTable(TTableName oldTableName, TTableName newTableName)
+  public Pair<Table, Table> renameTable(TTableName oldTableName, TTableName newTableName)
       throws CatalogException {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
     if (db == null) return null;
-    Table oldTable = db.removeTable(oldTableName.getTable_name());
-    if (oldTable == null) return null;
-    return addTable(newTableName.getDb_name(), newTableName.getTable_name());
+    versionLock_.writeLock().lock();
+    try {
+      Table oldTable =
+          removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
+      if (oldTable == null) return Pair.create(null, null);
+      return Pair.create(oldTable,
+          addTable(newTableName.getDb_name(), newTableName.getTable_name()));
+    } finally {
+      versionLock_.writeLock().unlock();
+    }
   }
 
   /**
@@ -1004,7 +1441,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
       try (MetaStoreClient msClient = getMetaStoreClient()) {
         org.apache.hadoop.hive.metastore.api.Table msTbl = null;
         try {
@@ -1019,7 +1456,7 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
       return tbl.toTCatalogObject();
     } finally {
-      Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+      Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
   }
@@ -1123,9 +1560,7 @@ public class CatalogServiceCatalog extends Catalog {
         try {
           msDb = msClient.getHiveClient().getDatabase(dbName);
           Preconditions.checkNotNull(msDb);
-          db = new Db(dbName, this, msDb);
-          db.setCatalogVersion(incrementAndGetCatalogVersion());
-          addDb(db);
+          addDb(dbName, msDb);
           dbWasAdded.setRef(true);
         } catch (TException e) {
           // The Metastore database cannot be get. Log the error and return.
@@ -1138,9 +1573,8 @@ public class CatalogServiceCatalog extends Catalog {
     // Add a new uninitialized table to the table cache, effectively invalidating
     // any existing entry. The metadata for the table will be loaded lazily, on the
     // on the next access to the table.
-    Table newTable = IncompleteTable.createUninitializedTable(db, tblName);
-    newTable.setCatalogVersion(incrementAndGetCatalogVersion());
-    db.addTable(newTable);
+    Table newTable = addTable(dbName, tblName);
+    Preconditions.checkNotNull(newTable);
     if (loadInBackground_) {
       tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
           tblName.toLowerCase()));
@@ -1148,7 +1582,10 @@ public class CatalogServiceCatalog extends Catalog {
     if (dbWasAdded.getRef()) {
       // The database should always have a lower catalog version than the table because
       // it needs to be created before the table can be added.
-      Preconditions.checkState(db.getCatalogVersion() < newTable.getCatalogVersion());
+      Db addedDb = newTable.getDb();
+      Preconditions.checkNotNull(addedDb);
+      Preconditions.checkState(
+          addedDb.getCatalogVersion() < newTable.getCatalogVersion());
     }
     return newTable.toTCatalogObject();
   }
@@ -1158,14 +1595,14 @@ public class CatalogServiceCatalog extends Catalog {
    * If a role with the same name already exists it will be overwritten.
    */
   public Role addRole(String roleName, Set<String> grantGroups) {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = new Role(roleName, grantGroups);
       role.setCatalogVersion(incrementAndGetCatalogVersion());
       authPolicy_.addRole(role);
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1175,14 +1612,19 @@ public class CatalogServiceCatalog extends Catalog {
    * exists.
    */
   public Role removeRole(String roleName) {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.removeRole(roleName);
       if (role == null) return null;
+      for (RolePrivilege priv: role.getPrivileges()) {
+        priv.setCatalogVersion(incrementAndGetCatalogVersion());
+        deleteLog_.addRemovedObject(priv.toTCatalogObject());
+      }
       role.setCatalogVersion(incrementAndGetCatalogVersion());
+      deleteLog_.addRemovedObject(role.toTCatalogObject());
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1192,14 +1634,14 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public Role addRoleGrantGroup(String roleName, String groupName)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.addGrantGroup(roleName, groupName);
       Preconditions.checkNotNull(role);
       role.setCatalogVersion(incrementAndGetCatalogVersion());
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1209,14 +1651,14 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public Role removeRoleGrantGroup(String roleName, String groupName)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.removeGrantGroup(roleName, groupName);
       Preconditions.checkNotNull(role);
       role.setCatalogVersion(incrementAndGetCatalogVersion());
       return role;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1227,7 +1669,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public RolePrivilege addRolePrivilege(String roleName, TPrivilege thriftPriv)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.getRole(roleName);
       if (role == null) throw new CatalogException("Role does not exist: " + roleName);
@@ -1236,7 +1678,7 @@ public class CatalogServiceCatalog extends Catalog {
       authPolicy_.addPrivilege(priv);
       return priv;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1247,7 +1689,7 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public RolePrivilege removeRolePrivilege(String roleName, TPrivilege thriftPriv)
       throws CatalogException {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       Role role = authPolicy_.getRole(roleName);
       if (role == null) throw new CatalogException("Role does not exist: " + roleName);
@@ -1255,9 +1697,10 @@ public class CatalogServiceCatalog extends Catalog {
           role.removePrivilege(thriftPriv.getPrivilege_name());
       if (rolePrivilege == null) return null;
       rolePrivilege.setCatalogVersion(incrementAndGetCatalogVersion());
+      deleteLog_.addRemovedObject(rolePrivilege.toTCatalogObject());
       return rolePrivilege;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1268,13 +1711,13 @@ public class CatalogServiceCatalog extends Catalog {
    */
   public RolePrivilege getRolePrivilege(String roleName, TPrivilege privSpec)
       throws CatalogException {
-    catalogLock_.readLock().lock();
+    versionLock_.readLock().lock();
     try {
       Role role = authPolicy_.getRole(roleName);
       if (role == null) throw new CatalogException("Role does not exist: " + roleName);
       return role.getPrivilege(privSpec.getPrivilege_name());
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
   }
 
@@ -1282,11 +1725,11 @@ public class CatalogServiceCatalog extends Catalog {
    * Increments the current Catalog version and returns the new value.
    */
   public long incrementAndGetCatalogVersion() {
-    catalogLock_.writeLock().lock();
+    versionLock_.writeLock().lock();
     try {
       return ++catalogVersion_;
     } finally {
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
     }
   }
 
@@ -1294,16 +1737,15 @@ public class CatalogServiceCatalog extends Catalog {
    * Returns the current Catalog version.
    */
   public long getCatalogVersion() {
-    catalogLock_.readLock().lock();
+    versionLock_.readLock().lock();
     try {
       return catalogVersion_;
     } finally {
-      catalogLock_.readLock().unlock();
+      versionLock_.readLock().unlock();
     }
   }
 
-  public ReentrantReadWriteLock getLock() { return catalogLock_; }
-
+  public ReentrantReadWriteLock getLock() { return versionLock_; }
   public SentryProxy getSentryProxy() { return sentryProxy_; }
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
 
@@ -1320,7 +1762,7 @@ public class CatalogServiceCatalog extends Catalog {
     }
     try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
-      catalogLock_.writeLock().unlock();
+      versionLock_.writeLock().unlock();
       HdfsTable hdfsTable = (HdfsTable) tbl;
       HdfsPartition hdfsPartition = hdfsTable
           .getPartitionFromThriftPartitionSpec(partitionSpec);
@@ -1355,8 +1797,111 @@ public class CatalogServiceCatalog extends Catalog {
           hdfsTable.getFullName(), partitionName));
       return hdfsTable.toTCatalogObject();
     } finally {
-      Preconditions.checkState(!catalogLock_.isWriteLockedByCurrentThread());
+      Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
   }
+
+  public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
+
+  /**
+   * Returns the version of the topic update that an operation using SYNC_DDL must wait
+   * for in order to ensure that its result set ('result') has been broadcast to all the
+   * coordinators. For operations that don't produce a result set, e.g. INVALIDATE
+   * METADATA, return the version specified in 'result.version'.
+   */
+  public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
+    if (!result.isSetUpdated_catalog_objects() &&
+        !result.isSetRemoved_catalog_objects()) {
+      return result.getVersion();
+    }
+    long lastSentTopicUpdate = lastSentTopicUpdate_.get();
+    // Maximum number of attempts (topic updates) to find the catalog topic version that
+    // an operation using SYNC_DDL must wait for.
+    long maxNumAttempts = 5;
+    if (result.isSetUpdated_catalog_objects()) {
+      maxNumAttempts =
+          result.getUpdated_catalog_objects().size() * (MAX_NUM_SKIPPED_TOPIC_UPDATES + 1);
+    }
+    long numAttempts = 0;
+    long begin = System.currentTimeMillis();
+    long versionToWaitFor = -1;
+    while (versionToWaitFor == -1) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
+      }
+      // Examine the topic update log to determine the latest topic update that
+      // covers the added/modified/deleted objects in 'result'.
+      long topicVersionForUpdates =
+          getCoveringTopicUpdateVersion(result.getUpdated_catalog_objects());
+      long topicVersionForDeletes =
+          getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects());
+      if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
+        // Wait for the next topic update.
+        synchronized(topicUpdateLog_) {
+          try {
+            topicUpdateLog_.wait(TOPIC_UPDATE_WAIT_TIMEOUT_MS);
+          } catch (InterruptedException e) {
+            // Ignore
+          }
+        }
+        long currentTopicUpdate = lastSentTopicUpdate_.get();
+        // Don't count time-based exits from the wait() toward the maxNumAttempts
+        // threshold.
+        if (lastSentTopicUpdate != currentTopicUpdate) {
+          ++numAttempts;
+          if (numAttempts > maxNumAttempts) {
+            throw new CatalogException("Couldn't retrieve the catalog topic version " +
+                "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
+                "The operation has been successfully executed but its effects may have " +
+                "not been broadcast to all the coordinators.");
+          }
+          lastSentTopicUpdate = currentTopicUpdate;
+        }
+      } else {
+        versionToWaitFor = Math.max(topicVersionForDeletes, topicVersionForUpdates);
+      }
+    }
+    Preconditions.checkState(versionToWaitFor >= 0);
+    LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " +
+        versionToWaitFor + ". Time to identify topic version (msec): " +
+        (System.currentTimeMillis() - begin));
+    return versionToWaitFor;
+  }
+
+  /**
+   * Returns the version of the topic update that covers a set of TCatalogObjects.
+   * A topic update U covers a TCatalogObject T, corresponding to a catalog object O,
+   * if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >=
+   * last_topic_update(O). The first condition indicates that a version of O that is
+   * larger or equal to the version in T has been added to a topic update. The second
+   * condition indicates that U is either the update to include O or an update following
+   * the one to include O. Returns -1 if there is a catalog object in 'tCatalogObjects'
+   * which doesn't satisfy the above conditions.
+   */
+  private long getCoveringTopicUpdateVersion(List<TCatalogObject> tCatalogObjects) {
+    if (tCatalogObjects == null || tCatalogObjects.isEmpty()) {
+      return lastSentTopicUpdate_.get();
+    }
+    long versionToWaitFor = -1;
+    for (TCatalogObject tCatalogObject: tCatalogObjects) {
+      TopicUpdateLog.Entry topicUpdateEntry =
+          topicUpdateLog_.get(Catalog.toCatalogObjectKey(tCatalogObject));
+      // There are two reasons for which a topic update log entry cannot be found:
+      // a) It corresponds to a new catalog object that hasn't been processed by a catalog
+      // update yet.
+      // b) It corresponds to a catalog object that hasn't been modified for at least
+      // TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
+      // collected.
+      // In both cases, -1 is returned to indicate that we're waiting for the
+      // entry to show up in the topic update log.
+      if (topicUpdateEntry == null ||
+          topicUpdateEntry.getLastSentVersion() < tCatalogObject.getCatalog_version()) {
+        return -1;
+      }
+      versionToWaitFor =
+          Math.max(versionToWaitFor, topicUpdateEntry.getLastSentCatalogUpdate());
+    }
+    return versionToWaitFor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/DataSource.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSource.java b/fe/src/main/java/org/apache/impala/catalog/DataSource.java
index e9601d7..f59f3be 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSource.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSource.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog;
 
 import org.apache.hadoop.fs.Path;
 
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDataSource;
 import com.google.common.base.Objects;
@@ -27,13 +28,12 @@ import com.google.common.base.Objects;
  * Represents a data source in the catalog. Contains the data source name and all
  * information needed to locate and load the data source.
  */
-public class DataSource implements CatalogObject {
+public class DataSource extends CatalogObjectImpl {
   private final String dataSrcName_;
   private final String className_;
   private final String apiVersionString_;
   // Qualified path to the data source.
   private final String location_;
-  private long catalogVersion_ =  Catalog.INITIAL_CATALOG_VERSION;
 
   public DataSource(String dataSrcName, String location, String className,
       String apiVersionString) {
@@ -54,16 +54,9 @@ public class DataSource implements CatalogObject {
   }
 
   @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
-  @Override
   public String getName() { return dataSrcName_; }
-
   @Override
-  public boolean isLoaded() { return true; }
+  public String getUniqueName() { return "DATA_SOURCE:" + dataSrcName_.toLowerCase(); }
 
   public String getLocation() { return location_; }
   public String getClassName() { return className_; }
@@ -85,4 +78,11 @@ public class DataSource implements CatalogObject {
   public static String debugString(TDataSource thrift) {
     return fromThrift(thrift).debugString();
   }
+
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObj =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObj.setData_source(toThrift());
+    return catalogObj;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 074ff92..f1c9c8e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -34,6 +34,7 @@ import org.apache.impala.catalog.Function;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.JniUtil;
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunction;
@@ -59,11 +60,10 @@ import com.google.common.collect.Maps;
  * value is the base64 representation of the thrift serialized function object.
  *
  */
-public class Db implements CatalogObject {
+public class Db extends CatalogObjectImpl {
   private static final Logger LOG = LoggerFactory.getLogger(Db.class);
   private final Catalog parentCatalog_;
   private final TDatabase thriftDb_;
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
 
   public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_";
 
@@ -134,16 +134,14 @@ public class Db implements CatalogObject {
   @Override
   public String getName() { return thriftDb_.getDb_name(); }
   @Override
-  public TCatalogObjectType getCatalogObjectType() {
-    return TCatalogObjectType.DATABASE;
-  }
+  public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.DATABASE; }
+  @Override
+  public String getUniqueName() { return "DATABASE:" + getName().toLowerCase(); }
 
   /**
    * Adds a table to the table cache.
    */
-  public void addTable(Table table) {
-    tableCache_.add(table);
-  }
+  public void addTable(Table table) { tableCache_.add(table); }
 
   /**
    * Gets all table names in the table cache.
@@ -165,9 +163,7 @@ public class Db implements CatalogObject {
    * Returns the Table with the given name if present in the table cache or null if the
    * table does not exist in the cache.
    */
-  public Table getTable(String tblName) {
-    return tableCache_.get(tblName);
-  }
+  public Table getTable(String tblName) { return tableCache_.get(tblName); }
 
   /**
    * Removes the table name and any cached metadata from the Table cache.
@@ -495,11 +491,10 @@ public class Db implements CatalogObject {
     return result;
   }
 
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
-  @Override
-  public boolean isLoaded() { return true; }
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObj =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObj.setDb(toThrift());
+    return catalogObj;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Function.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index 80316a6..03cd867 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -49,7 +49,7 @@ import com.google.common.collect.Lists;
  * - Builtin functions, which are recreated after every restart of the
  *   catalog. (persisted, visible to Impala)
  */
-public class Function implements CatalogObject {
+public class Function extends CatalogObjectImpl {
   // Enum for how to compare function signatures.
   // For decimal types, the type in the function can be a wildcard, i.e. decimal(*,*).
   // The wildcard can *only* exist as function type, the caller will always be a
@@ -106,7 +106,6 @@ public class Function implements CatalogObject {
   // Set to true for functions that survive service restarts, including all builtins,
   // native and IR functions, but only Java functions created without a signature.
   private boolean isPersistent_;
-  private long catalogVersion_ =  Catalog.INITIAL_CATALOG_VERSION;
 
   public Function(FunctionName name, Type[] argTypes,
       Type retType, boolean varArgs) {
@@ -298,15 +297,12 @@ public class Function implements CatalogObject {
 
   @Override
   public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.FUNCTION; }
-
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-
   @Override
   public String getName() { return getFunctionName().toString(); }
+  @Override
+  public String getUniqueName() {
+    return "FUNCTION:" + name_.toString() + "(" + signatureString() + ")";
+  }
 
   // Child classes must override this function.
   public String toSql(boolean ifNotExists) { return ""; }
@@ -315,7 +311,7 @@ public class Function implements CatalogObject {
     TCatalogObject result = new TCatalogObject();
     result.setType(TCatalogObjectType.FUNCTION);
     result.setFn(toThrift());
-    result.setCatalog_version(catalogVersion_);
+    result.setCatalog_version(getCatalogVersion());
     return result;
   }
 
@@ -372,9 +368,6 @@ public class Function implements CatalogObject {
     return function;
   }
 
-  @Override
-  public boolean isLoaded() { return true; }
-
   // Returns the resolved symbol in the binary. The BE will do a lookup of 'symbol'
   // in the binary and try to resolve unmangled names.
   // If this function is expecting a return argument, retArgType is that type. It should

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
index 398bc87..6f752d4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCachePool.java
@@ -28,8 +28,7 @@ import com.google.common.base.Preconditions;
  * care about for cache pools is the cache pool name. In the future it may be desirable
  * to track additional metadata such as the owner, size, and current usage of the pool.
  */
-public class HdfsCachePool implements CatalogObject {
-  private long catalogVersion_;
+public class HdfsCachePool extends CatalogObjectImpl {
   private final THdfsCachePool cachePool_;
 
   public HdfsCachePool(CachePoolInfo cachePoolInfo) {
@@ -57,9 +56,5 @@ public class HdfsCachePool implements CatalogObject {
   @Override
   public String getName() { return cachePool_.getPool_name(); }
   @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public void setCatalogVersion(long newVersion) { catalogVersion_ = newVersion; }
-  @Override
-  public boolean isLoaded() { return true; }
-}
\ No newline at end of file
+  public String getUniqueName() { return "HDFS_CACHE_POOL:" + getName().toLowerCase(); }
+}