You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/16 23:04:20 UTC

[2/4] impala git commit: IMPALA-5058: Improve the concurrency of DDL/DML operations

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 4c959b2..0e2e8b9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import com.google.common.base.Preconditions;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -29,6 +30,7 @@ import org.apache.thrift.TException;
 
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDataSource;
@@ -173,23 +175,31 @@ public class ImpaladCatalog extends Catalog {
       }
     }
 
-    // Now remove all objects from the catalog. Removing a database before removing
-    // its child tables/functions is fine. If that happens, the removal of the child
-    // object will be a no-op.
+    // Now remove all objects from the catalog. First remove low-level objects (tables,
+    // functions and privileges) and then the top-level objects (databases and roles).
     for (TCatalogObject catalogObject: req.getRemoved_objects()) {
-      removeCatalogObject(catalogObject, newCatalogVersion);
+      if (!isTopLevelCatalogObject(catalogObject)) {
+        removeCatalogObject(catalogObject);
+      }
     }
+    for (TCatalogObject catalogObject: req.getRemoved_objects()) {
+      if (isTopLevelCatalogObject(catalogObject)) {
+        removeCatalogObject(catalogObject);
+      }
+    }
+
+
     lastSyncedCatalogVersion_ = newCatalogVersion;
     // Cleanup old entries in the log.
     catalogDeltaLog_.garbageCollect(lastSyncedCatalogVersion_);
     isReady_.set(true);
-
     // Notify all the threads waiting on a catalog update.
     synchronized (catalogUpdateEventNotifier_) {
       catalogUpdateEventNotifier_.notifyAll();
     }
 
-    return new TUpdateCatalogCacheResponse(catalogServiceId_);
+    return new TUpdateCatalogCacheResponse(catalogServiceId_,
+        CatalogObjectVersionQueue.INSTANCE.getMinimumVersion());
   }
 
   /**
@@ -319,24 +329,10 @@ public class ImpaladCatalog extends Catalog {
   /**
    *  Removes the matching TCatalogObject from the catalog, if one exists and its
    *  catalog version is < the catalog version of this drop operation.
-   *  Note that drop operations that come from statestore heartbeats always have a
-   *  version of 0. To determine the drop version for statestore updates,
-   *  the catalog version from the current update is used. This is okay because there
-   *  can never be a catalog update from the statestore that contains a drop
-   *  and an addition of the same object. For more details on how drop
-   *  versioning works, see CatalogServerCatalog.java
    */
-  private void removeCatalogObject(TCatalogObject catalogObject,
-      long currentCatalogUpdateVersion) {
-    // The TCatalogObject associated with a drop operation from a state store
-    // heartbeat will always have a version of zero. Because no update from
-    // the state store can contain both a drop and an addition of the same object,
-    // we can assume the drop version is the current catalog version of this update.
-    // If the TCatalogObject contains a version that != 0, it indicates the drop
-    // came from a direct update.
-    long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
-        currentCatalogUpdateVersion : catalogObject.getCatalog_version();
-
+  private void removeCatalogObject(TCatalogObject catalogObject) {
+    Preconditions.checkState(catalogObject.getCatalog_version() != 0);
+    long dropCatalogVersion = catalogObject.getCatalog_version();
     switch(catalogObject.getType()) {
       case DATABASE:
         removeDb(catalogObject.getDb(), dropCatalogVersion);
@@ -360,7 +356,7 @@ public class ImpaladCatalog extends Catalog {
       case HDFS_CACHE_POOL:
         HdfsCachePool existingItem =
             hdfsCachePools_.get(catalogObject.getCache_pool().getPool_name());
-        if (existingItem.getCatalogVersion() > catalogObject.getCatalog_version()) {
+        if (existingItem.getCatalogVersion() <= catalogObject.getCatalog_version()) {
           hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
         }
         break;
@@ -381,6 +377,15 @@ public class ImpaladCatalog extends Catalog {
       Db newDb = Db.fromTDatabase(thriftDb, this);
       newDb.setCatalogVersion(catalogVersion);
       addDb(newDb);
+      if (existingDb != null) {
+        CatalogObjectVersionQueue.INSTANCE.updateVersions(
+            existingDb.getCatalogVersion(), catalogVersion);
+        CatalogObjectVersionQueue.INSTANCE.removeAll(existingDb.getTables());
+        CatalogObjectVersionQueue.INSTANCE.removeAll(
+            existingDb.getFunctions(null, new PatternMatcher()));
+      } else {
+        CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+      }
     }
   }
 
@@ -414,6 +419,12 @@ public class ImpaladCatalog extends Catalog {
     if (existingFn == null ||
         existingFn.getCatalogVersion() < catalogVersion) {
       db.addFunction(function);
+      if (existingFn != null) {
+        CatalogObjectVersionQueue.INSTANCE.updateVersions(
+            existingFn.getCatalogVersion(), catalogVersion);
+      } else {
+        CatalogObjectVersionQueue.INSTANCE.addVersion(catalogVersion);
+      }
     }
   }
 
@@ -431,6 +442,11 @@ public class ImpaladCatalog extends Catalog {
     Db db = getDb(thriftDb.getDb_name());
     if (db != null && db.getCatalogVersion() < dropCatalogVersion) {
       removeDb(db.getName());
+      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+          db.getCatalogVersion());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(db.getTables());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(
+          db.getFunctions(null, new PatternMatcher()));
     }
   }
 
@@ -455,6 +471,8 @@ public class ImpaladCatalog extends Catalog {
     Function fn = db.getFunction(thriftFn.getSignature());
     if (fn != null && fn.getCatalogVersion() < dropCatalogVersion) {
       db.removeFunction(thriftFn.getSignature());
+      CatalogObjectVersionQueue.INSTANCE.removeVersion(
+          fn.getCatalogVersion());
     }
   }
 
@@ -463,6 +481,7 @@ public class ImpaladCatalog extends Catalog {
     // version of the drop, remove the function.
     if (existingRole != null && existingRole.getCatalogVersion() < dropCatalogVersion) {
       authPolicy_.removeRole(thriftRole.getRole_name());
+      CatalogObjectVersionQueue.INSTANCE.removeAll(existingRole.getPrivileges());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Role.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Role.java b/fe/src/main/java/org/apache/impala/catalog/Role.java
index 0b89866..b45ff22 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Role.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Role.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TRole;
 import com.google.common.base.Preconditions;
@@ -30,11 +31,10 @@ import com.google.common.collect.Sets;
 /**
  * Represents a role in an authorization policy. This class is thread safe.
  */
-public class Role implements CatalogObject {
+public class Role extends CatalogObjectImpl {
   private final TRole role_;
   // The last role ID assigned, starts at 0.
   private static AtomicInteger roleId_ = new AtomicInteger(0);
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
 
   private final CatalogObjectCache<RolePrivilege> rolePrivileges_ =
       new CatalogObjectCache<RolePrivilege>();
@@ -134,11 +134,12 @@ public class Role implements CatalogObject {
   public String getName() { return role_.getRole_name(); }
   public int getId() { return role_.getRole_id(); }
   @Override
-  public synchronized long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public synchronized void setCatalogVersion(long newVersion) {
-    catalogVersion_ = newVersion;
+  public String getUniqueName() { return "ROLE:" + getName().toLowerCase(); }
+
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObject.setRole(toThrift());
+    return catalogObject;
   }
-  @Override
-  public boolean isLoaded() { return true; }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
index 87277af..ef3717c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
+++ b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TPrivilegeLevel;
@@ -33,16 +34,14 @@ import com.google.common.collect.Lists;
  * Represents a privilege that has been granted to a role in an authorization policy.
  * This class is thread safe.
  */
-public class RolePrivilege implements CatalogObject {
+public class RolePrivilege extends CatalogObjectImpl {
   private static final Logger LOG = Logger.getLogger(AuthorizationPolicy.class);
   // These Joiners are used to build role names. For simplicity, the role name we
   // use can also be sent to the Sentry library to perform authorization checks
   // so we build them in the same format.
   private static final Joiner AUTHORIZABLE_JOINER = Joiner.on("->");
   private static final Joiner KV_JOINER = Joiner.on("=");
-
   private final TPrivilege privilege_;
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
 
   private RolePrivilege(TPrivilege privilege) {
     privilege_ = privilege;
@@ -132,13 +131,16 @@ public class RolePrivilege implements CatalogObject {
   public String getName() { return privilege_.getPrivilege_name(); }
   public int getRoleId() { return privilege_.getRole_id(); }
   @Override
-  public synchronized long getCatalogVersion() { return catalogVersion_; }
-  @Override
-  public synchronized void setCatalogVersion(long newVersion) {
-    catalogVersion_ = newVersion;
+  public String getUniqueName() {
+    return "PRIVILEGE:" + getName().toLowerCase() + "." + Integer.toString(getRoleId());
+  }
+
+  public TCatalogObject toTCatalogObject() {
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObject.setPrivilege(toThrift());
+    return catalogObject;
   }
-  @Override
-  public boolean isLoaded() { return true; }
 
   // The time this role was created. Used to quickly check if the same privilege
   // was dropped and re-created. Assumes a role will not be created + dropped + created

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 23fa7a4..50fe953 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -56,13 +56,9 @@ import com.google.common.collect.Maps;
  * is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions
  * a key range into a fixed number of buckets).
  */
-public abstract class Table implements CatalogObject {
+public abstract class Table extends CatalogObjectImpl {
   private static final Logger LOG = Logger.getLogger(Table.class);
-
-  // Catalog version assigned to this table
-  private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
   protected org.apache.hadoop.hive.metastore.api.Table msTable_;
-
   protected final Db db_;
   protected final String name_;
   protected final String owner_;
@@ -358,13 +354,21 @@ public abstract class Table implements CatalogObject {
   }
 
   public TCatalogObject toTCatalogObject() {
-    TCatalogObject catalogObject = new TCatalogObject();
-    catalogObject.setType(getCatalogObjectType());
-    catalogObject.setCatalog_version(getCatalogVersion());
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
     catalogObject.setTable(toThrift());
     return catalogObject;
   }
 
+  public TCatalogObject toMinimalTCatalogObject() {
+    TCatalogObject catalogObject =
+        new TCatalogObject(getCatalogObjectType(), getCatalogVersion());
+    catalogObject.setTable(new TTable());
+    catalogObject.getTable().setDb_name(getDb().getName());
+    catalogObject.getTable().setTbl_name(getName());
+    return catalogObject;
+  }
+
   /**
    * Gets the ColumnType from the given FieldSchema by using Impala's SqlParser.
    * Throws a TableLoadingException if the FieldSchema could not be parsed.
@@ -396,6 +400,8 @@ public abstract class Table implements CatalogObject {
   public TableName getTableName() {
     return new TableName(db_ != null ? db_.getName() : null, name_);
   }
+  @Override
+  public String getUniqueName() { return "TABLE:" + getFullName(); }
 
   public ArrayList<Column> getColumns() { return colsByPos_; }
 
@@ -490,17 +496,6 @@ public abstract class Table implements CatalogObject {
   public TTableStats getTTableStats() { return tableStats_; }
   public ArrayType getType() { return type_; }
 
-  @Override
-  public long getCatalogVersion() { return catalogVersion_; }
-
-  @Override
-  public void setCatalogVersion(long catalogVersion) {
-    catalogVersion_ = catalogVersion;
-  }
-
-  @Override
-  public boolean isLoaded() { return true; }
-
   public static boolean isExternalTable(
       org.apache.hadoop.hive.metastore.api.Table msTbl) {
     return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
new file mode 100644
index 0000000..9d23c4f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+// A log of topic update information for each catalog object. An entry is added to
+// the log when a catalog object is processed (added/removed/skipped) in a topic
+// update and it is replaced every time the catalog object is processed in a
+// topic update.
+//
+// To prevent the log from growing indefinitely, the oldest entries
+// (in terms of last topic update that processed the associated catalog objects) are
+// garbage collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. That will cause
+// entries of deleted catalog objects or entries of objects that haven't been processed
+// by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from
+// the log.
+public class TopicUpdateLog {
+  private static final Logger LOG = Logger.getLogger(TopicUpdateLog.class);
+  // Frequency at which the entries of the topic update log are garbage collected.
+  // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.
+  private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000;
+
+  // Number of topic updates left to trigger a gc of topic update log entries.
+  private int numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+
+  // In the next gc cycle of topic update log entries, all the entries that were last
+  // added to a topic update with version less than or equal to
+  // 'oldestTopicUpdateToGc_' are removed from the update log.
+  private long oldestTopicUpdateToGc_ = -1;
+
+  // Represents an entry in the topic update log. A topic update log entry is
+  // associated with a catalog object and stores information about the last topic update
+  // that processed that object.
+  public static class Entry {
+    // Number of times the entry has skipped a topic update.
+    private final int numSkippedUpdates_;
+    // Last version of the corresponding catalog object that was added to a topic
+    // update. -1 if the object was never added to a topic update.
+    private final long lastSentVersion_;
+    // Version of the last topic update to include the corresponding catalog object.
+    // -1 if the object was never added to a topic update.
+    private final long lastSentTopicUpdate_;
+
+    Entry() {
+      numSkippedUpdates_ = 0;
+      lastSentVersion_ = -1;
+      lastSentTopicUpdate_ = -1;
+    }
+
+    Entry(int numSkippedUpdates, long lastSentVersion, long lastSentCatalogUpdate) {
+      numSkippedUpdates_ = numSkippedUpdates;
+      lastSentVersion_ = lastSentVersion;
+      lastSentTopicUpdate_ = lastSentCatalogUpdate;
+    }
+
+    public int getNumSkippedTopicUpdates() { return numSkippedUpdates_; }
+    public long getLastSentVersion() { return lastSentVersion_; }
+    public long getLastSentCatalogUpdate() { return lastSentTopicUpdate_; }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this.getClass() != other.getClass()) return false;
+      Entry entry = (Entry) other;
+      return numSkippedUpdates_ == entry.getNumSkippedTopicUpdates()
+          && lastSentVersion_ == entry.getLastSentVersion()
+          && lastSentTopicUpdate_ == entry.getLastSentCatalogUpdate();
+    }
+  }
+
+  // Entries in the topic update log stored as a map of catalog object keys to
+  // Entry objects.
+  private final ConcurrentHashMap<String, Entry> topicLogEntries_ =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Garbage-collects topic update log entries. These are entries that haven't been
+   * added to any of the last TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates.
+   */
+  public void garbageCollectUpdateLogEntries(long lastTopicUpdateVersion) {
+    if (oldestTopicUpdateToGc_ == -1) {
+      oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
+      return;
+    }
+    if (numTopicUpdatesToGc_ == 0) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Topic update log GC started.");
+      }
+      Preconditions.checkState(oldestTopicUpdateToGc_ > 0);
+      int numEntriesRemoved = 0;
+      for (Map.Entry<String, Entry> entry:
+           topicLogEntries_.entrySet()) {
+        if (entry.getValue().getLastSentVersion() == -1) continue;
+        if (entry.getValue().getLastSentCatalogUpdate() <= oldestTopicUpdateToGc_) {
+          if (topicLogEntries_.remove(entry.getKey(), entry.getValue())) {
+            ++numEntriesRemoved;
+          }
+        }
+      }
+      numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+      oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Topic update log GC finished. Removed " + numEntriesRemoved +
+            " entries.");
+      }
+    } else {
+      --numTopicUpdatesToGc_;
+    }
+  }
+
+  public void add(String catalogObjectKey, Entry logEntry) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+    Preconditions.checkNotNull(logEntry);
+    topicLogEntries_.put(catalogObjectKey, logEntry);
+  }
+
+  public Entry get(String catalogObjectKey) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+    return topicLogEntries_.get(catalogObjectKey);
+  }
+
+  // Returns the topic update log entry for the catalog object with key
+  // 'catalogObjectKey'. If the key does not exist, a newly constructed log entry is
+  // returned.
+  public Entry getOrCreateLogEntry(String catalogObjectKey) {
+    Preconditions.checkState(!Strings.isNullOrEmpty(catalogObjectKey));
+    Entry entry = topicLogEntries_.get(catalogObjectKey);
+    if (entry == null) entry = new Entry();
+    return entry;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index b0ed45f..295956c 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -179,7 +179,7 @@ import com.google.common.math.LongMath;
  * update operations and requires the use of fair table locks to prevent starvation.
  *
  *   DO {
- *     Acquire the catalog lock (see CatalogServiceCatalog.catalogLock_)
+ *     Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
  *     Try to acquire a table lock
  *     IF the table lock acquisition fails {
  *       Release the catalog lock
@@ -326,6 +326,15 @@ public class CatalogOpExecutor {
           ddlRequest.ddl_type);
     }
 
+    // If SYNC_DDL is set, set the catalog update that contains the results of this DDL
+    // operation. The version of this catalog update is returned to the requesting
+    // impalad which will wait until this catalog update has been broadcast to all the
+    // coordinators.
+    if (ddlRequest.isSync_ddl()) {
+      response.getResult().setVersion(
+          catalog_.waitForSyncDdlVersion(response.getResult()));
+    }
+
     // At this point, the operation is considered successful. If any errors occurred
     // during execution, this function will throw an exception and the CatalogServer
     // will handle setting a bad status code.
@@ -909,12 +918,15 @@ public class CatalogOpExecutor {
     String dbName = params.getDb();
     Preconditions.checkState(dbName != null && !dbName.isEmpty(),
         "Null or empty database name passed as argument to Catalog.createDatabase");
-    if (params.if_not_exists && catalog_.getDb(dbName) != null) {
+    Db existingDb = catalog_.getDb(dbName);
+    if (params.if_not_exists && existingDb != null) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Skipping database creation because " + dbName + " already exists "
             + "and IF NOT EXISTS was specified.");
       }
-      resp.getResult().setVersion(catalog_.getCatalogVersion());
+      Preconditions.checkNotNull(existingDb);
+      resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject());
+      resp.getResult().setVersion(existingDb.getCatalogVersion());
       return;
     }
     org.apache.hadoop.hive.metastore.api.Database db =
@@ -960,11 +972,7 @@ public class CatalogOpExecutor {
       }
 
       Preconditions.checkNotNull(newDb);
-      TCatalogObject thriftDb = new TCatalogObject(
-          TCatalogObjectType.DATABASE, Catalog.INITIAL_CATALOG_VERSION);
-      thriftDb.setDb(newDb.toThrift());
-      thriftDb.setCatalog_version(newDb.getCatalogVersion());
-      resp.result.addToUpdated_catalog_objects(thriftDb);
+      resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject());
     }
     resp.result.setVersion(newDb.getCatalogVersion());
   }
@@ -1038,22 +1046,18 @@ public class CatalogOpExecutor {
       throws ImpalaException {
     if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); }
     DataSource dataSource = DataSource.fromThrift(params.getData_source());
-    if (catalog_.getDataSource(dataSource.getName()) != null) {
+    DataSource existingDataSource = catalog_.getDataSource(dataSource.getName());
+    if (existingDataSource != null) {
       if (!params.if_not_exists) {
         throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
             " already exists.");
       }
-      // The user specified IF NOT EXISTS and the data source exists, just
-      // return the current catalog version.
-      resp.result.setVersion(catalog_.getCatalogVersion());
+      resp.result.addToUpdated_catalog_objects(existingDataSource.toTCatalogObject());
+      resp.result.setVersion(existingDataSource.getCatalogVersion());
       return;
     }
     catalog_.addDataSource(dataSource);
-    TCatalogObject addedObject = new TCatalogObject();
-    addedObject.setType(TCatalogObjectType.DATA_SOURCE);
-    addedObject.setData_source(dataSource.toThrift());
-    addedObject.setCatalog_version(dataSource.getCatalogVersion());
-    resp.result.addToUpdated_catalog_objects(addedObject);
+    resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
   }
 
@@ -1070,11 +1074,7 @@ public class CatalogOpExecutor {
       resp.result.setVersion(catalog_.getCatalogVersion());
       return;
     }
-    TCatalogObject removedObject = new TCatalogObject();
-    removedObject.setType(TCatalogObjectType.DATA_SOURCE);
-    removedObject.setData_source(dataSource.toThrift());
-    removedObject.setCatalog_version(dataSource.getCatalogVersion());
-    resp.result.addToRemoved_catalog_objects(removedObject);
+    resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject());
     resp.result.setVersion(dataSource.getCatalogVersion());
   }
 
@@ -1229,7 +1229,7 @@ public class CatalogOpExecutor {
       throw new CatalogException("Database " + db.getName() + " is not empty");
     }
 
-    TCatalogObject removedObject = new TCatalogObject();
+    TCatalogObject removedObject = null;
     synchronized (metastoreDdlLock_) {
       // Remove all the Kudu tables of 'db' from the Kudu storage engine.
       if (db != null && params.cascade) dropTablesFromKudu(db);
@@ -1251,11 +1251,9 @@ public class CatalogOpExecutor {
       for (String tableName: removedDb.getAllTableNames()) {
         uncacheTable(removedDb.getTable(tableName));
       }
-      removedObject.setCatalog_version(removedDb.getCatalogVersion());
+      removedObject = removedDb.toTCatalogObject();
     }
-    removedObject.setType(TCatalogObjectType.DATABASE);
-    removedObject.setDb(new TDatabase());
-    removedObject.getDb().setDb_name(params.getDb());
+    Preconditions.checkNotNull(removedObject);
     resp.result.setVersion(removedObject.getCatalog_version());
     resp.result.addToRemoved_catalog_objects(removedObject);
   }
@@ -1525,12 +1523,17 @@ public class CatalogOpExecutor {
     Preconditions.checkState(params.getColumns() != null,
         "Null column list given as argument to Catalog.createTable");
 
-    if (params.if_not_exists &&
-        catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
+    Table existingTbl = catalog_.getTable(tableName.getDb(), tableName.getTbl(), false);
+    if (params.if_not_exists && existingTbl != null) {
       LOG.trace(String.format("Skipping table creation because %s already exists and " +
           "IF NOT EXISTS was specified.", tableName));
-      response.getResult().setVersion(catalog_.getCatalogVersion());
-      return false;
+      existingTbl.getLock().lock();
+      try {
+        addTableToCatalogUpdate(existingTbl, response.getResult());
+        return false;
+      } finally {
+        existingTbl.getLock().unlock();
+      }
     }
     org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
     LOG.trace(String.format("Creating table %s", tableName));
@@ -1736,12 +1739,17 @@ public class CatalogOpExecutor {
     Preconditions.checkState(tblName != null && tblName.isFullyQualified());
     Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified());
 
-    if (params.if_not_exists &&
-        catalog_.containsTable(tblName.getDb(), tblName.getTbl())) {
+    Table existingTbl = catalog_.getTable(tblName.getDb(), tblName.getTbl(), false);
+    if (params.if_not_exists && existingTbl != null) {
       LOG.trace(String.format("Skipping table creation because %s already exists and " +
           "IF NOT EXISTS was specified.", tblName));
-      response.getResult().setVersion(catalog_.getCatalogVersion());
-      return;
+      existingTbl.getLock().lock();
+      try {
+        addTableToCatalogUpdate(existingTbl, response.getResult());
+        return;
+      } finally {
+        existingTbl.getLock().unlock();
+      }
     }
     Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
     org.apache.hadoop.hive.metastore.api.Table tbl =
@@ -2185,8 +2193,9 @@ public class CatalogOpExecutor {
     }
     // Rename the table in the Catalog and get the resulting catalog object.
     // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
-    Table newTable = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
-    if (newTable == null) {
+    Pair<Table, Table> result =
+        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
+    if (result.first == null || result.second == null) {
       // The rename succeeded in the HMS but failed in the catalog cache. The cache is in
       // an inconsistent state, but can likely be fixed by running "invalidate metadata".
       throw new ImpalaRuntimeException(String.format(
@@ -2196,14 +2205,9 @@ public class CatalogOpExecutor {
           newTableName.toString()));
     }
 
-    TCatalogObject addedObject = newTable.toTCatalogObject();
-    TCatalogObject removedObject = new TCatalogObject();
-    removedObject.setType(TCatalogObjectType.TABLE);
-    removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
-    removedObject.setCatalog_version(addedObject.getCatalog_version());
-    response.result.addToRemoved_catalog_objects(removedObject);
-    response.result.addToUpdated_catalog_objects(addedObject);
-    response.result.setVersion(addedObject.getCatalog_version());
+    response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
+    response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
+    response.result.setVersion(result.second.getCatalogVersion());
   }
 
   /**
@@ -2851,28 +2855,18 @@ public class CatalogOpExecutor {
     Preconditions.checkNotNull(rolePrivileges);
     List<TCatalogObject> updatedPrivs = Lists.newArrayList();
     for (RolePrivilege rolePriv: rolePrivileges) {
-      TCatalogObject catalogObject = new TCatalogObject();
-      catalogObject.setType(rolePriv.getCatalogObjectType());
-      catalogObject.setPrivilege(rolePriv.toThrift());
-      catalogObject.setCatalog_version(rolePriv.getCatalogVersion());
-      updatedPrivs.add(catalogObject);
-    }
-
-    // TODO: Currently we only support sending back 1 catalog object in a "direct DDL"
-    // response. If multiple privileges have been updated, just send back the
-    // catalog version so subscribers can wait for the statestore heartbeat that
-    // contains all updates (see IMPALA-5571).
-    if (updatedPrivs.size() == 1) {
+      updatedPrivs.add(rolePriv.toTCatalogObject());
+    }
+
+    if (!updatedPrivs.isEmpty()) {
       // If this is a REVOKE statement with hasGrantOpt, only the GRANT OPTION is revoked
-      // from the privilege.
+      // from the privileges. Otherwise the privileges are removed from the catalog.
       if (grantRevokePrivParams.isIs_grant() ||
           privileges.get(0).isHas_grant_opt()) {
         resp.result.setUpdated_catalog_objects(updatedPrivs);
       } else {
         resp.result.setRemoved_catalog_objects(updatedPrivs);
       }
-      resp.result.setVersion(updatedPrivs.get(0).getCatalog_version());
-    } else if (updatedPrivs.size() > 1) {
       resp.result.setVersion(
           updatedPrivs.get(updatedPrivs.size() - 1).getCatalog_version());
     }
@@ -3027,6 +3021,9 @@ public class CatalogOpExecutor {
           resp.result.setUpdated_catalog_objects(addedFuncs);
           resp.result.setRemoved_catalog_objects(removedFuncs);
           resp.result.setVersion(catalog_.getCatalogVersion());
+          for (TCatalogObject removedFn: removedFuncs) {
+            catalog_.getDeleteLog().addRemovedObject(removedFn);
+          }
         }
       }
     } else if (req.isSetTable_name()) {
@@ -3059,26 +3056,32 @@ public class CatalogOpExecutor {
             req.getTable_name().getTable_name());
       }
 
-      if (!dbWasAdded.getRef()) {
-        // Return the TCatalogObject in the result to indicate this request can be
-        // processed as a direct DDL operation.
-        if (tblWasRemoved.getRef()) {
-          resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
-        } else {
-          resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
-        }
+      // Return the TCatalogObject in the result to indicate this request can be
+      // processed as a direct DDL operation.
+      if (tblWasRemoved.getRef()) {
+        resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
       } else {
-        // Since multiple catalog objects were modified (db and table), don't treat this
-        // as a direct DDL operation. Set the overall catalog version and the impalad
-        // will wait for a statestore heartbeat that contains the update.
-        Preconditions.checkState(!req.isIs_refresh());
+        resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
+      }
+
+      if (dbWasAdded.getRef()) {
+        Db addedDb = catalog_.getDb(updatedThriftTable.getTable().getDb_name());
+        if (addedDb == null) {
+          throw new CatalogException("Database " +
+              updatedThriftTable.getTable().getDb_name() + " was removed by a " +
+              "concurrent operation. Try invalidating the table again.");
+        }
+        resp.getResult().addToUpdated_catalog_objects(addedDb.toTCatalogObject());
       }
       resp.getResult().setVersion(updatedThriftTable.getCatalog_version());
     } else {
       // Invalidate the entire catalog if no table name is provided.
       Preconditions.checkArgument(!req.isIs_refresh());
-      catalog_.reset();
-      resp.result.setVersion(catalog_.getCatalogVersion());
+      resp.getResult().setVersion(catalog_.reset());
+      resp.getResult().setIs_invalidate(true);
+    }
+    if (req.isSync_ddl()) {
+      resp.getResult().setVersion(catalog_.waitForSyncDdlVersion(resp.getResult()));
     }
     resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
     return resp;
@@ -3261,11 +3264,16 @@ public class CatalogOpExecutor {
 
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
       addTableToCatalogUpdate(table, response.result);
-      return response;
     } finally {
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
       table.getLock().unlock();
     }
+
+    if (update.isSync_ddl()) {
+      response.getResult().setVersion(
+          catalog_.waitForSyncDdlVersion(response.getResult()));
+    }
+    return response;
   }
 
   private List<String> getPartValsFromName(org.apache.hadoop.hive.metastore.api.Table

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index c62fc31..d0936d5 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -520,13 +520,17 @@ public class Frontend {
     } else {
       throw new IllegalStateException("Unexpected CatalogOp statement type.");
     }
-
     result.setResult_set_metadata(metadata);
+    ddl.setSync_ddl(result.getQuery_options().isSync_ddl());
     result.setCatalog_op_request(ddl);
     if (ddl.getOp_type() == TCatalogOpType.DDL) {
       TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader();
       header.setRequesting_user(analysis.getAnalyzer().getUser().getName());
       ddl.getDdl_params().setHeader(header);
+      ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
+    }
+    if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
+      ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b56527b..e945a3b 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -38,7 +38,8 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaRequest;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -119,9 +120,11 @@ public class JniCatalog {
   /**
    * Gets all catalog objects
    */
-  public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException {
-    TGetAllCatalogObjectsResponse resp =
-        catalog_.getCatalogObjects(from_version);
+  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
+      throws ImpalaException, TException {
+    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
+    JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
+    TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version());
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(resp);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index 23534d2..f2df66b 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -87,7 +87,7 @@ public class SentryProxy {
     }
     sentryPolicyService_ = new SentryPolicyService(sentryConfig);
 
-    policyReader_.scheduleAtFixedRate(new PolicyReader(), 0,
+    policyReader_.scheduleAtFixedRate(new PolicyReader(false), 0,
         BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(),
         TimeUnit.SECONDS);
   }
@@ -107,6 +107,12 @@ public class SentryProxy {
    * atomically.
    */
   private class PolicyReader implements Runnable {
+    private boolean resetVersions_;
+
+    public PolicyReader(boolean resetVersions) {
+      resetVersions_ = resetVersions;
+    }
+
     public void run() {
       synchronized (SentryProxy.this) {
         // Assume all roles should be removed. Then query the Policy Service and remove
@@ -131,6 +137,9 @@ public class SentryProxy {
             if (existingRole != null &&
                 existingRole.getGrantGroups().equals(grantGroups)) {
               role = existingRole;
+              if (resetVersions_) {
+                role.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
+              }
             } else {
               role = catalog_.addRole(sentryRole.getRoleName(), grantGroups);
             }
@@ -160,6 +169,10 @@ public class SentryProxy {
               // We already know about this privilege (privileges cannot be modified).
               if (existingPriv != null &&
                   existingPriv.getCreateTimeMs() == sentryPriv.getCreateTime()) {
+                if (resetVersions_) {
+                  existingPriv.setCatalogVersion(
+                      catalog_.incrementAndGetCatalogVersion());
+                }
                 continue;
               }
               catalog_.addRolePrivilege(role.getName(), thriftPriv);
@@ -302,10 +315,7 @@ public class SentryProxy {
       // Update the catalog
       for (TPrivilege privilege: privileges) {
         RolePrivilege rolePriv = catalog_.removeRolePrivilege(roleName, privilege);
-        if (rolePriv == null) {
-          rolePriv = RolePrivilege.fromThrift(privilege);
-          rolePriv.setCatalogVersion(catalog_.getCatalogVersion());
-        }
+        if (rolePriv == null) continue;
         rolePrivileges.add(rolePriv);
       }
     } else {
@@ -317,12 +327,7 @@ public class SentryProxy {
       List<TPrivilege> updatedPrivileges = Lists.newArrayList();
       for (TPrivilege privilege: privileges) {
         RolePrivilege existingPriv = catalog_.getRolePrivilege(roleName, privilege);
-        if (existingPriv == null) {
-          RolePrivilege rolePriv = RolePrivilege.fromThrift(privilege);
-          rolePriv.setCatalogVersion(catalog_.getCatalogVersion());
-          rolePrivileges.add(rolePriv);
-          continue;
-        }
+        if (existingPriv == null) continue;
         TPrivilege updatedPriv = existingPriv.toThrift();
         updatedPriv.setHas_grant_opt(false);
         updatedPrivileges.add(updatedPriv);
@@ -342,9 +347,9 @@ public class SentryProxy {
    * the Catalog with any changes. Throws an ImpalaRuntimeException if there are any
    * errors executing the refresh job.
    */
-  public void refresh() throws ImpalaRuntimeException {
+  public void refresh(boolean resetVersions) throws ImpalaRuntimeException {
     try {
-      policyReader_.submit(new PolicyReader()).get();
+      policyReader_.submit(new PolicyReader(resetVersions)).get();
     } catch (Exception e) {
       // We shouldn't make it here. It means an exception leaked from the
       // AuthorizationPolicyReader.

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 93e4af0..df2ba0d 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -38,7 +38,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     // Cache pools are typically loaded asynchronously, but as there is no fixed execution
     // order for tests, the cache pools are loaded synchronously before the tests are
     // executed.
-    CachePoolReader rd = new CachePoolReader();
+    CachePoolReader rd = new CachePoolReader(false);
     rd.run();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index fdc64e6..7e8ff46 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -65,9 +65,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
   /**
    * Reloads all metadata from the source catalog.
    */
-  public void reset() throws CatalogException {
-    srcCatalog_.reset();
-  }
+  public void reset() throws CatalogException { srcCatalog_.reset(); }
 
   /**
    * Overrides ImpaladCatalog.getTable to load the table metadata if it is missing.

http://git-wip-us.apache.org/repos/asf/impala/blob/3fc42ded/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index e2b1715..1003dc7 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -311,14 +311,12 @@ class StatestoreSubscriber(object):
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
-                        num_updates=1, deletions=None):
+                        num_updates=1):
     topic_entries = [
       Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
       for x in xrange(num_updates)]
-    if deletions is None: deletions = []
     return Subscriber.TTopicDelta(topic_name=topic_name,
                                   topic_entries=topic_entries,
-                                  topic_deletions=deletions,
                                   is_delta=False)
 
   def test_registration_ids_different(self):
@@ -349,11 +347,9 @@ class TestStatestore():
         assert len(args.topic_deltas) == 1
         assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
-        assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions
       elif sub.update_count == 3:
         # After the content-bearing update was processed, the next delta should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
-        assert len(args.topic_deltas[topic_name].topic_deletions) == 0
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
@@ -461,7 +457,7 @@ class TestStatestore():
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, see
         # IMPALA-1891
-        assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
+        assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),