You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/09 16:59:26 UTC

[07/21] impala git commit: IMPALA-5990: End-to-end compression of metadata

IMPALA-5990: End-to-end compression of metadata

Currently the catalog data is compressed in the statestore, but
uncompressed when passed between FE and BE. It results in a ~2GB limit
on the metadata. IMPALA-3499 introduced a workaround in the impalad but
there isn't one in the catalogd. This patch aims to increase the size
limit for statestore updates, reduce the copying of the metadata and
reduce the memory footprint. With this patch, the catalog objects are
passed and (de)compressed between FE and BE one at a time. The new
limits are:
- A single catalog object cannot be larger than ~2GB.
- A statestore catalog update cannot be larger than ~4GB. It is
  compressed size if FLAGS_compact_catalog_topic is true.
The behavior of the catalog op executer is not changed. The data is not
compressed and the size limit is still 2GB.

Testing: Ran existing tests. A test for compressing and decompressing
catalog objects is added. Manually tested with a 1.95GB catalog object
and a 3.90 GB uncompressed statestore update.

Change-Id: I3a8819cad734b3a416eef6c954e55b73cc6023ae
Reviewed-on: http://gerrit.cloudera.org:8080/8825
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a2431638
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a2431638
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a2431638

Branch: refs/heads/2.x
Commit: a2431638dc7ade13e99e5cb3db7f2f65888c45b9
Parents: f258a91
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Tue Jan 16 16:37:29 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 8 07:01:52 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/CMakeLists.txt                   |   2 +
 be/src/catalog/catalog-server.cc                |  78 +++----
 be/src/catalog/catalog-server.h                 |  19 +-
 be/src/catalog/catalog-util-test.cc             |  49 ++++
 be/src/catalog/catalog-util.cc                  | 164 ++++++++-----
 be/src/catalog/catalog-util.h                   |  93 ++++++--
 be/src/catalog/catalog.cc                       |   4 +-
 be/src/catalog/catalog.h                        |   9 +-
 be/src/rpc/thrift-util.h                        |  24 +-
 be/src/service/fe-support.cc                    | 102 ++++++++-
 be/src/service/frontend.cc                      |   6 +-
 be/src/service/frontend.h                       |   9 +-
 be/src/service/impala-server.cc                 | 185 +++------------
 be/src/service/impalad-main.cc                  |   2 +
 be/src/util/jni-util.cc                         |  21 +-
 be/src/util/jni-util.h                          |  54 +++--
 common/thrift/CatalogInternalService.thrift     |  16 +-
 common/thrift/Frontend.thrift                   |  22 +-
 .../java/org/apache/impala/catalog/Catalog.java |   2 +
 .../impala/catalog/CatalogServiceCatalog.java   | 229 +++++++++++--------
 .../apache/impala/catalog/ImpaladCatalog.java   | 145 +++++++-----
 .../org/apache/impala/service/FeSupport.java    |  31 ++-
 .../org/apache/impala/service/Frontend.java     |   3 +-
 .../org/apache/impala/service/JniCatalog.java   |  14 +-
 .../org/apache/impala/service/JniFrontend.java  |  29 +--
 .../org/apache/impala/util/TByteBuffer.java     |  60 +++++
 26 files changed, 812 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/catalog/CMakeLists.txt b/be/src/catalog/CMakeLists.txt
index 7debb91..35cccea 100644
--- a/be/src/catalog/CMakeLists.txt
+++ b/be/src/catalog/CMakeLists.txt
@@ -25,3 +25,5 @@ add_library(Catalog
   catalogd-main.cc
 )
 add_dependencies(Catalog gen-deps)
+
+ADD_BE_TEST(catalog-util-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 4bf26c0..2f4bcbe 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -238,14 +238,10 @@ void CatalogServer::UpdateCatalogTopicCallback(
     LOG_EVERY_N(INFO, 300) << "Catalog Version: " << catalog_objects_max_version_
                            << " Last Catalog Version: " << last_sent_catalog_version_;
 
-    for (const TTopicItem& catalog_object: pending_topic_updates_) {
-      if (subscriber_topic_updates->size() == 0) {
-        subscriber_topic_updates->push_back(TTopicDelta());
-        subscriber_topic_updates->back().topic_name = IMPALA_CATALOG_TOPIC;
-      }
-      TTopicDelta& update = subscriber_topic_updates->back();
-      update.topic_entries.push_back(catalog_object);
-    }
+    subscriber_topic_updates->emplace_back();
+    TTopicDelta& update = subscriber_topic_updates->back();
+    update.topic_name = IMPALA_CATALOG_TOPIC;
+    update.topic_entries = std::move(pending_topic_updates_);
 
     // Update the new catalog version and the set of known catalog objects.
     last_sent_catalog_version_ = catalog_objects_max_version_;
@@ -281,19 +277,13 @@ void CatalogServer::UpdateCatalogTopicCallback(
     } else if (current_catalog_version != last_sent_catalog_version_) {
       // If there has been a change since the last time the catalog was queried,
       // call into the Catalog to find out what has changed.
-      TGetCatalogDeltaResponse catalog_objects;
-      status = catalog_->GetCatalogDelta(last_sent_catalog_version_, &catalog_objects);
+      TGetCatalogDeltaResponse resp;
+      status = catalog_->GetCatalogDelta(this, last_sent_catalog_version_, &resp);
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        // Use the catalog objects to build a topic update list. These include
-        // objects added to the catalog, 'updated_objects', and objects deleted
-        // from the catalog, 'deleted_objects'. The order in which we process
-        // these two disjoint sets of catalog objects does not matter.
-        BuildTopicUpdates(catalog_objects.updated_objects, false);
-        BuildTopicUpdates(catalog_objects.deleted_objects, true);
         catalog_objects_min_version_ = last_sent_catalog_version_;
-        catalog_objects_max_version_ = catalog_objects.max_catalog_version;
+        catalog_objects_max_version_ = resp.max_catalog_version;
       }
     }
 
@@ -302,37 +292,6 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
-void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects,
-    bool topic_deletions) {
-  for (const TCatalogObject& catalog_object: catalog_objects) {
-    DCHECK_GT(catalog_object.catalog_version, last_sent_catalog_version_);
-    const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
-    if (entry_key.empty()) {
-      LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
-                               << ThriftDebugString(catalog_object);
-    }
-    pending_topic_updates_.push_back(TTopicItem());
-    TTopicItem& item = pending_topic_updates_.back();
-    item.key = entry_key;
-    item.deleted = topic_deletions;
-    Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
-    if (!status.ok()) {
-      LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
-      pending_topic_updates_.pop_back();
-      continue;
-    }
-    if (FLAGS_compact_catalog_topic) {
-      status = CompressCatalogObject(&item.value);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error compressing catalog object: " << status.GetDetail();
-        pending_topic_updates_.pop_back();
-      }
-    }
-    VLOG(1) << "Publishing " << (topic_deletions ? "deletion " : "update ")
-        << ": " << entry_key << "@" << catalog_object.catalog_version;
-  }
-}
-
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
   GetCatalogUsage(document);
@@ -497,3 +456,26 @@ void CatalogServer::TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
     document->AddMember("error", error, document->GetAllocator());
   }
 }
+
+bool CatalogServer::AddPendingTopicItem(std::string key, const uint8_t* item_data,
+    uint32_t size, bool deleted) {
+  pending_topic_updates_.emplace_back();
+  TTopicItem& item = pending_topic_updates_.back();
+  if (FLAGS_compact_catalog_topic) {
+    Status status = CompressCatalogObject(item_data, size, &item.value);
+    if (!status.ok()) {
+      pending_topic_updates_.pop_back();
+      LOG(ERROR) << "Error compressing topic item: " << status.GetDetail();
+      return false;
+    }
+  } else {
+    item.value.assign(reinterpret_cast<const char*>(item_data),
+        static_cast<size_t>(size));
+  }
+  item.key = std::move(key);
+  item.deleted = deleted;
+  VLOG(1) << "Publishing " << (deleted ? "deletion: " : "update: ") << item.key <<
+      " original size: " << size << (FLAGS_compact_catalog_topic ?
+      Substitute(" compressed size: $0", item.value.size()) : string());
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 0b6b220..a6a0c3f 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -74,6 +74,11 @@ class CatalogServer {
   }
   Catalog* catalog() const { return catalog_.get(); }
 
+  /// Add a topic item to pending_topic_updates_. Caller must hold catalog_lock_.
+  /// The return value is true if the operation succeeds and false otherwise.
+  bool AddPendingTopicItem(std::string key, const uint8_t* item_data, uint32_t size,
+      bool deleted);
+
  private:
   /// Thrift API implementation which proxies requests onto this CatalogService.
   boost::shared_ptr<CatalogServiceIf> thrift_iface_;
@@ -143,20 +148,6 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
-  /// Builds the next topic update to send based on what items
-  /// have been added/changed/removed from the catalog since the last hearbeat. To do
-  /// this, it enumerates the given catalog objects returned looking for the objects that
-  /// have a catalog version that is > the catalog version sent with the last heartbeat.
-  /// 'topic_deletions' is true if 'catalog_objects' contain deleted catalog
-  /// objects.
-  ///
-  /// The key for each entry is a string composed of:
-  /// "TCatalogObjectType:<unique object name>". So for table foo.bar, the key would be
-  /// "TABLE:foo.bar". Encoding the object type information in the key ensures the keys
-  /// are unique. Must hold catalog_lock_ when calling this function.
-  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects,
-      bool topic_deletions);
-
   /// Example output:
   /// "databases": [
   ///         {

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util-test.cc b/be/src/catalog/catalog-util-test.cc
new file mode 100644
index 0000000..d37fc5c
--- /dev/null
+++ b/be/src/catalog/catalog-util-test.cc
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "catalog/catalog-util.h"
+#include "testutil/gtest-util.h"
+
+using namespace impala;
+using namespace std;
+
+void CompressAndDecompress(const std::string& input) {
+  string compressed;
+  string decompressed;
+  ASSERT_OK(CompressCatalogObject(reinterpret_cast<const uint8_t*>(input.data()),
+      static_cast<uint32_t>(input.size()), &compressed));
+  ASSERT_OK(DecompressCatalogObject(reinterpret_cast<const uint8_t*>(compressed.data()),
+      static_cast<uint32_t>(compressed.size()), &decompressed));
+  ASSERT_EQ(input.size(), decompressed.size());
+  ASSERT_EQ(input, decompressed);
+}
+
+
+TEST(CatalogUtil, TestCatalogCompression) {
+  CompressAndDecompress("");
+  CompressAndDecompress("deadbeef");
+  string large_string;
+  uint32_t large_string_size = 0x7E000000; // LZ4_MAX_INPUT_SIZE
+  large_string.reserve(large_string_size);
+  for (uint32_t i = 0; i < large_string_size; ++i) {
+    large_string.push_back(static_cast<char>(rand() % (1 + numeric_limits<char>::max())));
+  }
+  CompressAndDecompress(large_string);
+}
+
+IMPALA_TEST_MAIN();
+

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index 789723e..5d1c0fa 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -20,9 +20,9 @@
 #include <sstream>
 
 #include "catalog/catalog-util.h"
-#include "common/status.h"
 #include "exec/read-write-util.h"
 #include "util/compress.h"
+#include "util/jni-util.h"
 #include "util/debug-util.h"
 
 #include "common/names.h"
@@ -31,6 +31,94 @@ using boost::algorithm::to_upper_copy;
 
 namespace impala {
 
+jclass JniCatalogCacheUpdateIterator::pair_cl;
+jmethodID JniCatalogCacheUpdateIterator::pair_ctor;
+jclass JniCatalogCacheUpdateIterator::boolean_cl;
+jmethodID JniCatalogCacheUpdateIterator::boolean_ctor;
+
+Status JniCatalogCacheUpdateIterator::InitJNI() {
+  JNIEnv* env = getJNIEnv();
+  if (env == nullptr) return Status("Failed to get/create JVM");
+  RETURN_IF_ERROR(
+      JniUtil::GetGlobalClassRef(env, "Lorg/apache/impala/common/Pair;", &pair_cl));
+  pair_ctor = env->GetMethodID(pair_cl, "<init>",
+      "(Ljava/lang/Object;Ljava/lang/Object;)V");
+  RETURN_ERROR_IF_EXC(env);
+  RETURN_IF_ERROR(
+      JniUtil::GetGlobalClassRef(env, "Ljava/lang/Boolean;", &boolean_cl));
+  boolean_ctor = env->GetMethodID(boolean_cl, "<init>", "(Z)V");
+  RETURN_ERROR_IF_EXC(env);
+  return Status::OK();
+}
+
+Status JniCatalogCacheUpdateIterator::createPair(JNIEnv* env, bool deleted,
+    const uint8_t* buffer, long size, jobject* out) {
+  jobject deleted_obj = env->NewObject(boolean_cl, boolean_ctor,
+      static_cast<jboolean>(deleted));
+  RETURN_ERROR_IF_EXC(env);
+  jobject byte_buffer = env->NewDirectByteBuffer(const_cast<uint8_t*>(buffer), size);
+  RETURN_ERROR_IF_EXC(env);
+  *out = env->NewObject(pair_cl, pair_ctor, deleted_obj, byte_buffer);
+  RETURN_ERROR_IF_EXC(env);
+  return Status::OK();
+}
+
+jobject TopicItemSpanIterator::next(JNIEnv* env) {
+  while (begin_ != end_) {
+    jobject result;
+    Status s;
+    const TTopicItem* current = begin_++;
+    if (decompress_) {
+      s = DecompressCatalogObject(
+          reinterpret_cast<const uint8_t*>(current->value.data()),
+          static_cast<uint32_t>(current->value.size()), &decompressed_buffer_);
+      if (!s.ok()) {
+        LOG(ERROR) << "Error decompressing catalog object: " << s.GetDetail();
+        continue;
+      }
+      s = createPair(env, current->deleted,
+          reinterpret_cast<const uint8_t*>(decompressed_buffer_.data()),
+          static_cast<long>(decompressed_buffer_.size()), &result);
+    } else {
+      s = createPair(env, current->deleted,
+          reinterpret_cast<const uint8_t*>(current->value.data()),
+          static_cast<long>(current->value.size()), &result);
+    }
+    if (s.ok()) return result;
+    LOG(ERROR) << "Error creating return value: " << s.GetDetail();
+  }
+  return nullptr;
+}
+
+jobject CatalogUpdateResultIterator::next(JNIEnv* env) {
+  const vector<TCatalogObject>& removed = result_.removed_catalog_objects;
+  const vector<TCatalogObject>& updated = result_.updated_catalog_objects;
+  while (pos_ != removed.size() + updated.size()) {
+    bool deleted;
+    const TCatalogObject* current_obj;
+    if (pos_ < removed.size()) {
+      current_obj = &removed[pos_];
+      deleted = true;
+    } else {
+      current_obj = &updated[pos_ - removed.size()];
+      deleted = false;
+    }
+    ++pos_;
+    uint8_t* buf;
+    uint32_t buf_size;
+    Status s = serializer_.Serialize(current_obj, &buf_size, &buf);
+    if (!s.ok()) {
+      LOG(ERROR) << "Error serializing catalog object: " << s.GetDetail();
+      continue;
+    }
+    jobject result = nullptr;
+    s = createPair(env, deleted, buf, buf_size, &result);
+    if (s.ok()) return result;
+    LOG(ERROR) << "Error creating jobject." << s.GetDetail();
+  }
+  return nullptr;
+}
+
 TCatalogObjectType::type TCatalogObjectTypeFromName(const string& name) {
   const string& upper = to_upper_copy(name);
   if (upper == "DATABASE") {
@@ -136,79 +224,31 @@ Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
   return Status::OK();
 }
 
-string TCatalogObjectToEntryKey(const TCatalogObject& catalog_object) {
-  // The key format is: "TCatalogObjectType:<fully qualified object name>"
-  stringstream entry_key;
-  entry_key << PrintTCatalogObjectType(catalog_object.type) << ":";
-  switch (catalog_object.type) {
-    case TCatalogObjectType::DATABASE:
-      entry_key << catalog_object.db.db_name;
-      break;
-    case TCatalogObjectType::TABLE:
-    case TCatalogObjectType::VIEW:
-      entry_key << catalog_object.table.db_name << "." << catalog_object.table.tbl_name;
-      break;
-    case TCatalogObjectType::FUNCTION:
-      entry_key << catalog_object.fn.name.db_name << "."
-                << catalog_object.fn.signature;
-      break;
-    case TCatalogObjectType::CATALOG:
-      entry_key << catalog_object.catalog.catalog_service_id;
-      break;
-    case TCatalogObjectType::DATA_SOURCE:
-      entry_key << catalog_object.data_source.name;
-      break;
-    case TCatalogObjectType::HDFS_CACHE_POOL:
-      entry_key << catalog_object.cache_pool.pool_name;
-      break;
-    case TCatalogObjectType::ROLE:
-      entry_key << catalog_object.role.role_name;
-      break;
-    case TCatalogObjectType::PRIVILEGE:
-      entry_key << catalog_object.privilege.role_id << "."
-                << catalog_object.privilege.privilege_name;
-      break;
-    default:
-      break;
-  }
-  return entry_key.str();
-}
-
-Status CompressCatalogObject(string* catalog_object) {
+Status CompressCatalogObject(const uint8_t* src, uint32_t size, string* dst) {
   scoped_ptr<Codec> compressor;
   RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, THdfsCompression::LZ4,
       &compressor));
-  string output_buffer;
-  int64_t compressed_data_len = compressor->MaxOutputLen(catalog_object->size());
-  DCHECK_GT(compressed_data_len, 0);
+  int64_t compressed_data_len = compressor->MaxOutputLen(size);
   int64_t output_buffer_len = compressed_data_len + sizeof(uint32_t);
-  output_buffer.resize(output_buffer_len);
-  uint8_t* output_buffer_ptr =
-      const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(output_buffer.data()));
-  ReadWriteUtil::PutInt(output_buffer_ptr, static_cast<uint32_t>(catalog_object->size()));
+  dst->resize(static_cast<size_t>(output_buffer_len));
+  uint8_t* output_buffer_ptr = reinterpret_cast<uint8_t*>(&((*dst)[0]));
+  ReadWriteUtil::PutInt(output_buffer_ptr, size);
   output_buffer_ptr += sizeof(uint32_t);
-  RETURN_IF_ERROR(compressor->ProcessBlock(true, catalog_object->size(),
-      reinterpret_cast<const uint8_t*>(catalog_object->data()), &compressed_data_len,
+  RETURN_IF_ERROR(compressor->ProcessBlock(true, size, src, &compressed_data_len,
       &output_buffer_ptr));
-  output_buffer.resize(compressed_data_len + sizeof(uint32_t));
-  *catalog_object = move(output_buffer);
+  dst->resize(compressed_data_len + sizeof(uint32_t));
   return Status::OK();
 }
 
-Status DecompressCatalogObject(const string& compressed_catalog_object,
-    vector<uint8_t>* output_buffer) {
+Status DecompressCatalogObject(const uint8_t* src, uint32_t size, string* dst) {
   scoped_ptr<Codec> decompressor;
   RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, false, THdfsCompression::LZ4,
       &decompressor));
-  const uint8_t* input_data_ptr =
-      reinterpret_cast<const uint8_t*>(compressed_catalog_object.data());
-  int64_t decompressed_len = ReadWriteUtil::GetInt<uint32_t>(input_data_ptr);
-  output_buffer->resize(decompressed_len);
-  input_data_ptr += sizeof(uint32_t);
-  uint8_t* decompressed_data_ptr = output_buffer->data();
-  int64_t compressed_data_len = compressed_catalog_object.size() - sizeof(uint32_t);
-  RETURN_IF_ERROR(decompressor->ProcessBlock(true, compressed_data_len,
-      input_data_ptr, &decompressed_len, &decompressed_data_ptr));
+  int64_t decompressed_len = ReadWriteUtil::GetInt<uint32_t>(src);
+  dst->resize(static_cast<size_t>(decompressed_len));
+  uint8_t* decompressed_data_ptr = reinterpret_cast<uint8_t*>(&((*dst)[0]));
+  RETURN_IF_ERROR(decompressor->ProcessBlock(true, size - sizeof(uint32_t),
+      src + sizeof(uint32_t), &decompressed_len, &decompressed_data_ptr));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index e98cd38..a01e9bb 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -19,14 +19,75 @@
 #ifndef IMPALA_CATALOG_CATALOG_UTIL_H
 #define IMPALA_CATALOG_CATALOG_UTIL_H
 
+#include <jni.h>
+#include <gen-cpp/StatestoreService_types.h>
+#include <gen-cpp/CatalogService_types.h>
+#include <rpc/thrift-util.h>
+
 #include "common/status.h"
 #include "gen-cpp/CatalogObjects_types.h"
 
 namespace impala {
 
-/// Contains utility functions for working with TCatalogObjects and their related types.
+/// A helper class used to pass catalog object updates to the FE. With this iterator, the
+/// catalog objects are decompressed and transferred to the FE one by one without having
+/// to keep the entire uncompressed catalog objects in memory.
+class JniCatalogCacheUpdateIterator {
+ public:
+  /// Initialize JNI classes and method IDs. Currently it is only initilized in impalad.
+  static Status InitJNI();
+
+  /// Return the next catalog object from a catalog update. The return type is
+  /// Pair<Boolean, ByteBuffer>. The Boolean value is true if the update is a delete
+  /// operation. The ByteBuffer is the serialized TCatalogObject. null is returned at the
+  /// end of the update set. The return value is invalided on the next call.
+  /// If the deserialization or decompression of an object is unsuccessful, the object
+  /// will be skipped and the next valid object is returned.
+  virtual jobject next(JNIEnv* env) = 0;
+
+  virtual ~JniCatalogCacheUpdateIterator() = default;
+
+ protected:
+  /// A helper function used to create the return value of next().
+  Status createPair(JNIEnv* env, bool deleted, const uint8_t* buffer, long size,
+      jobject* out);
+
+ private:
+  static jclass pair_cl;
+  static jmethodID pair_ctor;
+  static jclass boolean_cl;
+  static jmethodID boolean_ctor;
+};
+
+/// Pass catalog objects in CatalogUpdateCallback().
+class TopicItemSpanIterator : public JniCatalogCacheUpdateIterator {
+ public:
+  TopicItemSpanIterator(const vector<TTopicItem>& items, bool decompress) :
+      begin_(items.data()), end_(items.data() + items.size()),
+      decompress_(decompress) {}
 
-class Status;
+  jobject next(JNIEnv* env) override;
+
+ private:
+  const TTopicItem* begin_;
+  const TTopicItem* end_;
+  bool decompress_;
+  std::string decompressed_buffer_;
+};
+
+/// Pass catalog objects in ProcessCatalogUpdateResult().
+class CatalogUpdateResultIterator : public JniCatalogCacheUpdateIterator {
+ public:
+  explicit CatalogUpdateResultIterator(const TCatalogUpdateResult& catalog_update_result)
+      : result_(catalog_update_result), pos_(0), serializer_(false) {}
+
+  jobject next(JNIEnv* env) override;
+
+ private:
+  const TCatalogUpdateResult& result_;
+  int pos_;
+  ThriftSerializer serializer_;
+};
 
 /// Converts a string to the matching TCatalogObjectType enum type. Returns
 /// TCatalogObjectType::UNKNOWN if no match was found.
@@ -37,23 +98,17 @@ TCatalogObjectType::type TCatalogObjectTypeFromName(const std::string& name);
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
     const std::string& object_name, TCatalogObject* catalog_object);
 
-/// Builds and returns the topic entry key string for the given TCatalogObject. The key
-/// format is: "TCatalogObjectType:<fully qualified object name>". So a table named
-/// "foo" in a database named "bar" would have a key of: "TABLE:bar.foo"
-/// Returns an empty string if there were any problem building the key.
-std::string TCatalogObjectToEntryKey(const TCatalogObject& catalog_object);
-
-/// Compresses a serialized catalog object using LZ4 and stores it back in
-/// 'catalog_object'. Stores the size of the uncopressed catalog object in the
-/// first sizeof(uint32_t) bytes of 'catalog_object'.
-Status CompressCatalogObject(std::string* catalog_object) WARN_UNUSED_RESULT;
-
-/// Decompress an LZ4-compressed catalog object. The decompressed object
-/// is stored in 'output_buffer'. The first sizeof(uint32_t) bytes of
-/// 'compressed_catalog_object' store the size of the uncompressed catalog
-/// object.
-Status DecompressCatalogObject(const std::string& compressed_catalog_object,
-    std::vector<uint8_t>* output_buffer) WARN_UNUSED_RESULT;
+/// Compresses a serialized catalog object using LZ4 and stores it back in 'dst'. Stores
+/// the size of the uncompressed catalog object in the first sizeof(uint32_t) bytes of
+/// 'dst'. The compression fails if the uncompressed data size exceeds 0x7E000000 bytes.
+Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string* dst)
+    WARN_UNUSED_RESULT;
+
+/// Decompress an LZ4-compressed catalog object. The decompressed object is stored in
+/// 'dst'. The first sizeof(uint32_t) bytes of 'src' store the size of the uncompressed
+/// catalog object.
+Status DecompressCatalogObject(const uint8_t* src, uint32_t size, std::string* dst)
+    WARN_UNUSED_RESULT;
 
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index d96d23e..dcc1657 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -99,8 +99,10 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
-Status Catalog::GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp) {
+Status Catalog::GetCatalogDelta(CatalogServer* caller, int64_t from_version,
+    TGetCatalogDeltaResponse* resp) {
   TGetCatalogDeltaRequest request;
+  request.__set_native_catalog_server_ptr(reinterpret_cast<int64_t>(caller));
   request.__set_from_version(from_version);
   return JniUtil::CallJniMethod(catalog_, get_catalog_delta_id_, request, resp);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 13e4529..872ceca 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -27,6 +27,8 @@
 
 namespace impala {
 
+class CatalogServer;
+
 /// The Catalog is a proxy for the Java-side JniCatalog class. The interface is a set of
 /// wrapper functions for methods called over JNI.
 class Catalog {
@@ -58,8 +60,11 @@ class Catalog {
 
   /// Retrieves the catalog objects that were added/modified/deleted since version
   /// 'from_version'. Returns OK if the operation was successful, otherwise a Status
-  /// object with information on the error will be returned.
-  Status GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp);
+  /// object with information on the error will be returned. 'caller' is a pointer to
+  /// the caller CatalogServer object. caller->AddTopicUpdate() will be repeatedly
+  /// called by the frontend.
+  Status GetCatalogDelta(CatalogServer* caller, int64_t from_version,
+      TGetCatalogDeltaResponse* resp);
 
   /// Gets the Thrift representation of a Catalog object. The request is a TCatalogObject
   /// which has the desired TCatalogObjectType and name properly set.

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/rpc/thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 1a66286..24b0b6f 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -49,12 +49,11 @@ class ThriftSerializer {
 
   /// Serializes obj into result.  Result will contain a copy of the memory.
   template <class T>
-  Status Serialize(T* obj, std::vector<uint8_t>* result) {
-    uint32_t len = 0;
-    uint8_t* buffer = NULL;
-    RETURN_IF_ERROR(Serialize<T>(obj, &len, &buffer));
-    result->resize(len);
-    memcpy(&((*result)[0]), buffer, len);
+  Status Serialize(const T* obj, std::vector<uint8_t>* result) {
+    uint32_t len;
+    uint8_t* buffer;
+    RETURN_IF_ERROR(Serialize(obj, &len, &buffer));
+    result->assign(buffer, buffer + len);
     return Status::OK();
   }
 
@@ -62,7 +61,7 @@ class ThriftSerializer {
   /// memory returned is owned by this object and will be invalid when another object
   /// is serialized.
   template <class T>
-  Status Serialize(T* obj, uint32_t* len, uint8_t** buffer) {
+  Status Serialize(const T* obj, uint32_t* len, uint8_t** buffer) {
     try {
       mem_buffer_->resetBuffer();
       obj->write(protocol_.get());
@@ -76,7 +75,7 @@ class ThriftSerializer {
   }
 
   template <class T>
-  Status Serialize(T* obj, std::string* result) {
+  Status Serialize(const T* obj, std::string* result) {
     try {
       mem_buffer_->resetBuffer();
       obj->write(protocol_.get());
@@ -94,15 +93,6 @@ class ThriftSerializer {
   boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol_;
 };
 
-class ThriftDeserializer {
- public:
-  ThriftDeserializer(bool compact);
-
- private:
-  boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> factory_;
-  boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto_;
-};
-
 /// Utility to create a protocol (deserialization) object for 'mem'.
 boost::shared_ptr<apache::thrift::protocol::TProtocol>
 CreateDeserializeProtocol(

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 0a84d09..12ac874 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -20,7 +20,9 @@
 #include "service/fe-support.h"
 
 #include <boost/scoped_ptr.hpp>
+#include <catalog/catalog-util.h>
 
+#include "catalog/catalog-server.h"
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
 #include "common/logging.h"
@@ -424,6 +426,65 @@ Java_org_apache_impala_service_FeSupport_NativeLookupSymbol(
   return result_bytes;
 }
 
+// Add a catalog update to pending_topic_updates_.
+extern "C"
+JNIEXPORT void JNICALL
+Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
+    jclass caller_class, jlong native_catalog_server_ptr, jstring key,
+    jbyteArray serialized_object, jboolean deleted) {
+  std::string key_string;
+  {
+    JniUtfCharGuard key_str;
+    if (!JniUtfCharGuard::create(env, key, &key_str).ok()) return;
+    key_string.assign(key_str.get());
+  }
+  JniScopedArrayCritical obj_buf;
+  if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) return;
+  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->AddPendingTopicItem(
+      std::move(key_string), obj_buf.get(), static_cast<uint32_t>(obj_buf.size()),
+      deleted);
+}
+
+// Get the next catalog update pointed by 'callback_ctx'.
+extern "C"
+JNIEXPORT jobject JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetNextCatalogObjectUpdate(JNIEnv* env,
+    jclass caller_class, jlong native_iterator_ptr) {
+  return reinterpret_cast<JniCatalogCacheUpdateIterator*>(native_iterator_ptr)->next(env);
+}
+
+extern "C"
+JNIEXPORT jboolean JNICALL
+Java_org_apache_impala_service_FeSupport_NativeLibCacheSetNeedsRefresh(JNIEnv* env,
+    jclass caller_class, jstring hdfs_location) {
+  string str;
+  {
+    JniUtfCharGuard hdfs_location_data;
+    if (!JniUtfCharGuard::create(env, hdfs_location, &hdfs_location_data).ok()) {
+      return static_cast<jboolean>(false);
+    }
+    str.assign(hdfs_location_data.get());
+  }
+  LibCache::instance()->SetNeedsRefresh(str);
+  return static_cast<jboolean>(false);
+}
+
+extern "C"
+JNIEXPORT jboolean JNICALL
+Java_org_apache_impala_service_FeSupport_NativeLibCacheRemoveEntry(JNIEnv* env,
+    jclass caller_class, jstring hdfs_lib_file) {
+  string str;
+  {
+    JniUtfCharGuard hdfs_lib_file_data;
+    if (!JniUtfCharGuard::create(env, hdfs_lib_file, &hdfs_lib_file_data).ok()) {
+      return static_cast<jboolean>(false);
+    }
+    str.assign(hdfs_lib_file_data.get());
+  }
+  LibCache::instance()->RemoveEntry(str);
+  return static_cast<jboolean>(true);
+}
+
 // Calls in to the catalog server to request prioritizing the loading of metadata for
 // specific catalog objects.
 extern "C"
@@ -478,28 +539,45 @@ namespace impala {
 
 static JNINativeMethod native_methods[] = {
   {
-    (char*)"NativeFeTestInit", (char*)"()V",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativeFeTestInit
+      (char*)"NativeFeTestInit", (char*)"()V",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeFeTestInit
+  },
+  {
+      (char*)"NativeEvalExprsWithoutRow", (char*)"([B[B)[B",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow
+  },
+  {
+      (char*)"NativeCacheJar", (char*)"([B)[B",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeCacheJar
+  },
+  {
+      (char*)"NativeLookupSymbol", (char*)"([B)[B",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeLookupSymbol
+  },
+  {
+      (char*)"NativePrioritizeLoad", (char*)"([B)[B",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
   },
   {
-    (char*)"NativeEvalExprsWithoutRow", (char*)"([B[B)[B",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow
+      (char*)"NativeParseQueryOptions", (char*)"(Ljava/lang/String;[B)[B",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions
   },
   {
-    (char*)"NativeCacheJar", (char*)"([B)[B",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativeCacheJar
+      (char*)"NativeAddPendingTopicItem", (char*)"(JLjava/lang/String;[BZ)Z",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem
   },
   {
-    (char*)"NativeLookupSymbol", (char*)"([B)[B",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativeLookupSymbol
+      (char*)"NativeGetNextCatalogObjectUpdate",
+      (char*)"(J)Lorg/apache/impala/common/Pair;",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeGetNextCatalogObjectUpdate
   },
   {
-    (char*)"NativePrioritizeLoad", (char*)"([B)[B",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
+      (char*)"NativeLibCacheSetNeedsRefresh", (char*)"(Ljava/lang/String;)Z",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeLibCacheSetNeedsRefresh
   },
   {
-    (char*)"NativeParseQueryOptions", (char*)"(Ljava/lang/String;[B)[B",
-    (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions
+      (char*)"NativeLibCacheRemoveEntry", (char*)"(Ljava/lang/String;)Z",
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeLibCacheRemoveEntry
   },
 };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 50883af..db54158 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -69,7 +69,7 @@ Frontend::Frontend() {
     {"getHadoopConfig", "([B)[B", &get_hadoop_config_id_},
     {"getAllHadoopConfigs", "()[B", &get_hadoop_configs_id_},
     {"checkConfiguration", "()Ljava/lang/String;", &check_config_id_},
-    {"updateCatalogCache", "([[B)[B", &update_catalog_cache_id_},
+    {"updateCatalogCache", "([B)[B", &update_catalog_cache_id_},
     {"updateMembership", "([B)V", &update_membership_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
     {"describeDb", "([B)[B", &describe_db_id_},
@@ -109,7 +109,7 @@ Frontend::Frontend() {
   ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_));
 }
 
-Status Frontend::UpdateCatalogCache(const vector<TUpdateCatalogCacheRequest>& req,
+Status Frontend::UpdateCatalogCache(const TUpdateCatalogCacheRequest& req,
     TUpdateCatalogCacheResponse* resp) {
   return JniUtil::CallJniMethod(fe_, update_catalog_cache_id_, req, resp);
 }
@@ -267,4 +267,4 @@ Status Frontend::GetTableFiles(const TShowFilesParams& params, TResultSet* resul
 Status Frontend::BuildTestDescriptorTable(const TBuildTestDescriptorTableParams& params,
     TDescriptorTable* result) {
   return JniUtil::CallJniMethod(fe_, build_test_descriptor_table_id_, params, result);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/service/frontend.h
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 4881220..a152b7f 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -38,10 +38,11 @@ class Frontend {
   /// or if there is any further exception, the constructor will terminate the process.
   Frontend();
 
-  /// Request to update the Impalad catalog cache. The req argument contains a vector of
-  /// updates that each contain objects that should be added/removed from the Catalog.
-  /// Returns a response that contains details such as the new max catalog version.
-  Status UpdateCatalogCache(const vector<TUpdateCatalogCacheRequest>& req,
+  /// Request to update the Impalad catalog cache. The 'req' argument contains a pointer
+  /// to a CatalogServer used for the FE to call NativeGetNextCatalogTopicItem() back to
+  /// get the catalog objects iteratively. Returns a response that contains details such
+  /// as the new max catalog version.
+  Status UpdateCatalogCache(const TUpdateCatalogCacheRequest& req,
       TUpdateCatalogCacheResponse *resp);
 
   /// Request to update the Impalad frontend cluster membership snapshot.  The

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index af79180..0c5f75b 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -228,9 +228,6 @@ const string ImpalaServer::AUDIT_EVENT_LOG_FILE_PREFIX = "impala_audit_event_log
 const string LINEAGE_LOG_FILE_PREFIX = "impala_lineage_log_1.0-";
 
 const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
-// Max size for multiple update in a single split. JNI is not able to write java byte
-// array more than 2GB. A single topic update is not restricted by this.
-const uint64_t MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES = 500 * 1024 * 1024;
 
 const string BEESWAX_SERVER_NAME = "beeswax-frontend";
 const string HS2_SERVER_NAME = "hiveserver2-frontend";
@@ -1327,144 +1324,39 @@ void ImpalaServer::CatalogUpdateCallback(
       incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
-
-  // Update catalog cache in frontend. An update is split into batches of size
-  // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
-  if (delta.topic_entries.size() != 0)  {
-    vector<TCatalogObject> dropped_objects;
-    vector<TUpdateCatalogCacheRequest> update_reqs;
-    update_reqs.push_back(TUpdateCatalogCacheRequest());
-    TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
-    incremental_request->__set_is_delta(delta.is_delta);
-    // Process all Catalog updates (new and modified objects) and determine what the
-    // new catalog version will be.
-    int64_t new_catalog_version = catalog_update_info_.catalog_version;
-    uint64_t batch_size_bytes = 0;
-    for (const TTopicItem& item: delta.topic_entries) {
-      Status status;
-      vector<uint8_t> data_buffer;
-      const uint8_t* data_buffer_ptr = nullptr;
-      uint32_t len = 0;
-      if (FLAGS_compact_catalog_topic) {
-        status = DecompressCatalogObject(item.value, &data_buffer);
-        if (!status.ok()) {
-          LOG(ERROR) << "Error decompressing catalog object " << item.key
-                     << ": " << status.GetDetail();
-          continue;
-        }
-        data_buffer_ptr = data_buffer.data();
-        len = data_buffer.size();
-      } else {
-        data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
-        len = item.value.size();
-      }
-      if (len > 100 * 1024 * 1024 /* 100MB */) {
-        LOG(INFO) << "Received large catalog object(>100mb): "
-            << item.key << " is "
-            << PrettyPrinter::Print(len, TUnit::BYTES);
-      }
-      TCatalogObject catalog_object;
-      status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic,
-          &catalog_object);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error deserializing item " << item.key
-            << ": " << status.GetDetail();
-        continue;
-      }
-
-      if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
-        // Initialize a new batch of catalog updates.
-        update_reqs.push_back(TUpdateCatalogCacheRequest());
-        incremental_request = &update_reqs.back();
-        batch_size_bytes = 0;
-      }
-
-      if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        incremental_request->__set_catalog_service_id(
-            catalog_object.catalog.catalog_service_id);
-        new_catalog_version = catalog_object.catalog_version;
-      }
-      VLOG(3) << (item.deleted ? "Deleted " : "Added ") << "item: " << item.key
-          << " version: " << catalog_object.catalog_version << " of size: " << len;
-
-      if (!item.deleted) {
-        // Refresh the lib cache entries of any added functions and data sources
-        // TODO: if frontend returns the list of functions and data sources, we do not
-        // need to deserialize these in backend.
-        if (catalog_object.type == TCatalogObjectType::FUNCTION) {
-          DCHECK(catalog_object.__isset.fn);
-          LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
-        }
-        if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-          DCHECK(catalog_object.__isset.data_source);
-          LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
-        }
-        incremental_request->updated_objects.push_back(catalog_object);
-      } else {
-        // We need to look up any dropped functions and data sources and remove
-        // them from the library cache.
-        if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-            catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-          TCatalogObject existing_object;
-          if (exec_env_->frontend()->GetCatalogObject(
-              catalog_object, &existing_object).ok()) {
-            // If the object exists in the catalog it may have been dropped and
-            // re-created. To avoid removing the re-created object's entry from
-            // the cache verify that the existing object's version <= the
-            // version of the dropped object included in this statestore
-            // heartbeat.
-            DCHECK_NE(existing_object.catalog_version, catalog_object.catalog_version);
-            if (existing_object.catalog_version < catalog_object.catalog_version) {
-              dropped_objects.push_back(existing_object);
-            }
-          }
-        }
-        incremental_request->removed_objects.push_back(catalog_object);
-      }
-      batch_size_bytes += len;
-    }
-
-    // Call the FE to apply the changes to the Impalad Catalog.
-    TUpdateCatalogCacheResponse resp;
-    Status s = exec_env_->frontend()->UpdateCatalogCache(update_reqs, &resp);
-    if (!s.ok()) {
-      LOG(ERROR) << "There was an error processing the impalad catalog update. Requesting"
-                 << " a full topic update to recover: " << s.GetDetail();
-      subscriber_topic_updates->push_back(TTopicDelta());
-      TTopicDelta& update = subscriber_topic_updates->back();
-      update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
-      update.__set_from_version(0L);
-      ImpaladMetrics::CATALOG_READY->SetValue(false);
-      // Dropped all cached lib files (this behaves as if all functions and data
-      // sources are dropped).
-      LibCache::instance()->DropCache();
-    } else {
-      {
-        unique_lock<mutex> unique_lock(catalog_version_lock_);
-        catalog_update_info_.catalog_version = new_catalog_version;
-        catalog_update_info_.catalog_topic_version = delta.to_version;
-        catalog_update_info_.catalog_service_id = resp.catalog_service_id;
-        catalog_update_info_.min_catalog_object_version = resp.min_catalog_object_version;
-        LOG(INFO) << "Catalog topic update applied with version: " << new_catalog_version
-            << " new min catalog object version: " << resp.min_catalog_object_version;
-      }
-      ImpaladMetrics::CATALOG_READY->SetValue(new_catalog_version > 0);
-      // TODO: deal with an error status
-      discard_result(UpdateCatalogMetrics());
-      // Remove all dropped objects from the library cache.
-      // TODO: is this expensive? We'd like to process heartbeats promptly.
-      for (TCatalogObject& object: dropped_objects) {
-        if (object.type == TCatalogObjectType::FUNCTION) {
-          LibCache::instance()->RemoveEntry(object.fn.hdfs_location);
-        } else if (object.type == TCatalogObjectType::DATA_SOURCE) {
-          LibCache::instance()->RemoveEntry(object.data_source.hdfs_location);
-        } else {
-          DCHECK(false);
-        }
-      }
+  TopicItemSpanIterator callback_ctx (delta.topic_entries, FLAGS_compact_catalog_topic);
+
+  TUpdateCatalogCacheRequest req;
+  req.__set_is_delta(delta.is_delta);
+  req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
+  TUpdateCatalogCacheResponse resp;
+  Status s = exec_env_->frontend()->UpdateCatalogCache(req, &resp);
+  if (!s.ok()) {
+    LOG(ERROR) << "There was an error processing the impalad catalog update. Requesting"
+               << " a full topic update to recover: " << s.GetDetail();
+    subscriber_topic_updates->emplace_back();
+    TTopicDelta& update = subscriber_topic_updates->back();
+    update.topic_name = CatalogServer::IMPALA_CATALOG_TOPIC;
+    update.__set_from_version(0L);
+    ImpaladMetrics::CATALOG_READY->SetValue(false);
+    // Dropped all cached lib files (this behaves as if all functions and data
+    // sources are dropped).
+    LibCache::instance()->DropCache();
+  } else {
+    {
+      unique_lock<mutex> unique_lock(catalog_version_lock_);
+      catalog_update_info_.catalog_version = resp.new_catalog_version;
+      catalog_update_info_.catalog_topic_version = delta.to_version;
+      catalog_update_info_.catalog_service_id = resp.catalog_service_id;
+      catalog_update_info_.min_catalog_object_version = resp.min_catalog_object_version;
+      LOG(INFO) << "Catalog topic update applied with version: " <<
+          resp.new_catalog_version << " new min catalog object version: " <<
+          resp.min_catalog_object_version;
     }
+    ImpaladMetrics::CATALOG_READY->SetValue(resp.new_catalog_version > 0);
+    // TODO: deal with an error status
+    discard_result(UpdateCatalogMetrics());
   }
-
   // Always update the minimum subscriber version for the catalog topic.
   {
     unique_lock<mutex> unique_lock(catalog_version_lock_);
@@ -1555,20 +1447,17 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
       WaitForCatalogUpdateTopicPropagation(catalog_service_id);
     }
   } else {
-    // Operation with a result set.
+    CatalogUpdateResultIterator callback_ctx(catalog_update_result);
     TUpdateCatalogCacheRequest update_req;
     update_req.__set_is_delta(true);
+    update_req.__set_native_iterator_ptr(reinterpret_cast<int64_t>(&callback_ctx));
+    // The catalog version is updated in WaitForCatalogUpdate below. So we need a
+    // standalone field in the request to update the service ID without touching the
+    // catalog version.
     update_req.__set_catalog_service_id(catalog_update_result.catalog_service_id);
-    if (catalog_update_result.__isset.updated_catalog_objects) {
-      update_req.__set_updated_objects(catalog_update_result.updated_catalog_objects);
-    }
-    if (catalog_update_result.__isset.removed_catalog_objects) {
-      update_req.__set_removed_objects(catalog_update_result.removed_catalog_objects);
-    }
     // Apply the changes to the local catalog cache.
     TUpdateCatalogCacheResponse resp;
-    Status status = exec_env_->frontend()->UpdateCatalogCache(
-        vector<TUpdateCatalogCacheRequest>{update_req}, &resp);
+    Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
     RETURN_IF_ERROR(status);
     if (!wait_for_all_subscribers) return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index a7f6abd..e0049d0 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -22,6 +22,7 @@
 #include <unistd.h>
 #include <jni.h>
 
+#include "catalog/catalog-util.h"
 #include "common/logging.h"
 #include "common/init.h"
 #include "exec/hbase-table-scanner.h"
@@ -66,6 +67,7 @@ int ImpaladMain(int argc, char** argv) {
   ABORT_IF_ERROR(HBaseTable::InitJNI());
   ABORT_IF_ERROR(HBaseTableWriter::InitJNI());
   ABORT_IF_ERROR(HiveUdfCall::InitEnv());
+  ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI());
   InitFeSupport();
 
   if (FLAGS_enable_rm) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/util/jni-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index 3128697..db7893b 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -30,7 +30,8 @@ namespace impala {
 Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) {
   DCHECK(jstr != nullptr);
   DCHECK(!env->ExceptionCheck());
-  const char* utf_chars = env->GetStringUTFChars(jstr, nullptr);
+  jboolean is_copy;
+  const char* utf_chars = env->GetStringUTFChars(jstr, &is_copy);
   bool exception_check = static_cast<bool>(env->ExceptionCheck());
   if (utf_chars == nullptr || exception_check) {
     if (exception_check) env->ExceptionClear();
@@ -45,6 +46,24 @@ Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out)
   return Status::OK();
 }
 
+bool JniScopedArrayCritical::Create(JNIEnv* env, jbyteArray jarr,
+    JniScopedArrayCritical* out) {
+  DCHECK(env != nullptr);
+  DCHECK(out != nullptr);
+  DCHECK(!env->ExceptionCheck());
+  int size = env->GetArrayLength(jarr);
+  void* pac = env->GetPrimitiveArrayCritical(jarr, nullptr);
+  if (pac == nullptr) {
+    LOG(ERROR) << "GetPrimitiveArrayCritical() failed. Probable OOM on JVM side";
+    return false;
+  }
+  out->env_ = env;
+  out->jarr_ = jarr;
+  out->arr_ = static_cast<uint8_t*>(pac);
+  out->size_ = size;
+  return true;
+}
+
 jclass JniUtil::jni_util_cl_ = NULL;
 jclass JniUtil::internal_exc_cl_ = NULL;
 jmethodID JniUtil::get_jvm_metrics_id_ = NULL;

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/be/src/util/jni-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 43ac131..f0afb66 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -168,6 +168,36 @@ class JniUtfCharGuard {
   DISALLOW_COPY_AND_ASSIGN(JniUtfCharGuard);
 };
 
+class JniScopedArrayCritical {
+ public:
+  /// Construct a JniScopedArrayCritical holding nothing.
+  JniScopedArrayCritical():  env_(nullptr), jarr_(nullptr), arr_(nullptr), size_(0) {}
+
+  /// Release the held byte[] contents if necessary.
+  ~JniScopedArrayCritical() {
+    if (env_ != nullptr && jarr_ != nullptr && arr_ != nullptr) {
+      env_->ReleasePrimitiveArrayCritical(jarr_, arr_, JNI_ABORT);
+    }
+  }
+
+  /// Try to get the contents of 'jarr' via JNIEnv::GetPrimitiveArrayCritical() and set
+  /// the results in 'out'. Returns true upon success and false otherwise. If false is
+  /// returned 'out' is not modified.
+  static bool Create(JNIEnv* env, jbyteArray jarr, JniScopedArrayCritical* out)
+      WARN_UNUSED_RESULT;
+
+  uint8_t* get() const { return arr_; }
+
+  int size() const { return size_; }
+ private:
+  JNIEnv* env_;
+  jbyteArray jarr_;
+  uint8_t* arr_;
+  int size_;
+  DISALLOW_COPY_AND_ASSIGN(JniScopedArrayCritical);
+};
+
+
 /// Utility class for JNI-related functionality.
 /// Init() should be called as soon as the native library is loaded.
 /// Creates global class references, and promotes local references to global references.
@@ -316,30 +346,6 @@ class JniUtil {
     return Status::OK();
   }
 
-  template <typename T, typename R>
-  static Status CallJniMethod(const jobject& obj, const jmethodID& method,
-      const vector<T>& args, R* response) {
-    JNIEnv* jni_env = getJNIEnv();
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    jclass jByteArray_class = jni_env->FindClass("[B");
-    jobjectArray array_of_jByteArray =
-        jni_env->NewObjectArray(args.size(), jByteArray_class, NULL);
-    RETURN_ERROR_IF_EXC(jni_env);
-    jbyteArray request_bytes;
-    for (int i = 0; i < args.size(); i++) {
-      RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &args[i], &request_bytes));
-      jni_env->SetObjectArrayElement(array_of_jByteArray, i, request_bytes);
-      RETURN_ERROR_IF_EXC(jni_env);
-      jni_env->DeleteLocalRef(request_bytes);
-    }
-    jbyteArray result_bytes = static_cast<jbyteArray>(
-        jni_env->CallObjectMethod(obj, method, array_of_jByteArray));
-    RETURN_ERROR_IF_EXC(jni_env);
-    RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response));
-    return Status::OK();
-  }
-
   template <typename T>
   static Status CallJniMethod(const jobject& obj, const jmethodID& method,
       const T& arg, std::string* response) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/common/thrift/CatalogInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogInternalService.thrift b/common/thrift/CatalogInternalService.thrift
index ffccd4b..6007f02 100644
--- a/common/thrift/CatalogInternalService.thrift
+++ b/common/thrift/CatalogInternalService.thrift
@@ -26,21 +26,23 @@ include "CatalogObjects.thrift"
 struct TGetCatalogDeltaRequest {
   // The base catalog version from which the delta is computed.
   1: required i64 from_version
+
+  // The native caller ptr for calling back NativeAddPendingTopicItem().
+  2: required i64 native_catalog_server_ptr
 }
 
-// Response from a call to GetCatalogDelta. Contains a delta of catalog objects
-// (databases, tables/views, and functions) from the CatalogService's cache relative (>)
-// to the catalog version specified in TGetCatalogDelta.from_version.
+// Response from a call to GetCatalogDelta. The catalog object updates are passed
+// separately via NativeAddPendingTopicItem() callback.
 struct TGetCatalogDeltaResponse {
   // The maximum catalog version of all objects in this response or 0 if the Catalog
   // contained no objects.
   1: required i64 max_catalog_version
 
   // List of updated (new and modified) catalog objects whose catalog verion is
-  // larger than TGetCatalotDeltaRequest.from_version.
-  2: required list<CatalogObjects.TCatalogObject> updated_objects
+  // larger than TGetCatalotDeltaRequest.from_version. Deprecated after IMPALA-5990.
+  2: optional list<CatalogObjects.TCatalogObject> updated_objects_deprecated
 
   // List of deleted catalog objects whose catalog version is larger than
-  // TGetCatalogDelta.from_version.
-  3: required list<CatalogObjects.TCatalogObject> deleted_objects
+  // TGetCatalogDelta.from_version. Deprecated after IMPALA-5990.
+  3: optional list<CatalogObjects.TCatalogObject> deleted_objects_deprecated
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index ba21605..b81c1a1 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -655,20 +655,25 @@ struct TSymbolLookupResult {
 }
 
 // Sent from the impalad BE to FE with the results of each CatalogUpdate heartbeat.
-// Contains details on all catalog objects that need to be updated.
+// The catalog object updates are passed separately via NativeGetCatalogUpdate() callback.
 struct TUpdateCatalogCacheRequest {
   // True if update only contains entries changed from the previous update. Otherwise,
   // contains the entire topic.
   1: required bool is_delta
 
-  // The Catalog Service ID this update came from.
-  2: required Types.TUniqueId catalog_service_id
+  // The Catalog Service ID this update came from. A request should has either this field
+  // set or a Catalog typed catalog object in the update list.
+  2: optional Types.TUniqueId catalog_service_id
 
-  // New or modified items. Empty list if no items were updated.
-  3: required list<CatalogObjects.TCatalogObject> updated_objects
+  // New or modified items. Empty list if no items were updated. Deprecated after
+  // IMPALA-5990.
+  3: optional list<CatalogObjects.TCatalogObject> updated_objects_deprecated
 
-  // Empty if no items were removed or is_delta is false.
-  4: required list<CatalogObjects.TCatalogObject> removed_objects
+  // Empty if no items were removed or is_delta is false. Deprecated after IMPALA-5990.
+  4: optional list<CatalogObjects.TCatalogObject> removed_objects_deprecated
+
+  // The native ptr for calling back NativeGetCatalogUpdate().
+  5: required i64 native_iterator_ptr
 }
 
 // Response from a TUpdateCatalogCacheRequest.
@@ -678,6 +683,9 @@ struct TUpdateCatalogCacheResponse {
 
   // The minimum catalog object version after CatalogUpdate() was processed.
   2: required i64 min_catalog_object_version
+
+  // The updated catalog version needed by the backend.
+  3: required i64 new_catalog_version
 }
 
 // Sent from the impalad BE to FE with the latest cluster membership snapshot resulting

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 6136835..ff7b1e4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -567,6 +567,8 @@ public abstract class Catalog {
             catalogObject.getCache_pool().getPool_name().toLowerCase();
       case DATA_SOURCE:
         return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
+      case CATALOG:
+        return "CATALOG:" + catalogObject.getCatalog().catalog_service_id;
       default:
         throw new IllegalStateException(
             "Unsupported catalog object type: " + catalogObject.getType());

http://git-wip-us.apache.org/repos/asf/impala/blob/a2431638/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 7bc2a91..a3b0cb0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -23,6 +23,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,12 +56,12 @@ import org.apache.impala.common.JniUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.hive.executor.UdfExecutor;
+import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogUpdateResult;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TGetCatalogUsageResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
@@ -72,6 +73,8 @@ import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 
 import com.codahale.metrics.Timer;
@@ -387,54 +390,85 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Identifies and returns the catalog objects that were added/modified/deleted in the
-   * catalog with versions > 'fromVersion'. It operates on a snaphsot of the catalog
-   * without holding the catalog lock which means that other concurrent metadata
-   * operations can still make progress while the catalog delta is computed. An entry in
-   * the topic update log is added for every catalog object that is included in the
-   * catalog delta. The log is examined by operations using SYNC_DDL to determine which
-   * topic update covers the result set of metadata operation. Once the catalog delta is
-   * computed, the entries in the delete log with versions less than 'fromVersion' are
-   * garbage collected.
+   * The context for add*ToCatalogDelta(), called by getCatalogDelta. It contains
+   * callback information, version range and collected topics.
    */
-  public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
-    // Maximum catalog version (inclusive) to be included in the catalog delta.
-    long toVersion = getCatalogVersion();
-    TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
-    resp.setUpdated_objects(new ArrayList<TCatalogObject>());
-    resp.setDeleted_objects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(toVersion);
+  class GetCatalogDeltaContext {
+    // The CatalogServer pointer for NativeAddPendingTopicItem() callback.
+    long nativeCatalogServerPtr;
+    // The from and to version of this delta.
+    long fromVersion;
+    long toVersion;
+    // The keys of the updated topics.
+    Set<String> updatedCatalogObjects;
+    TSerializer serializer;
+
+    GetCatalogDeltaContext(long nativeCatalogServerPtr, long fromVersion, long toVersion)
+    {
+      this.nativeCatalogServerPtr = nativeCatalogServerPtr;
+      this.fromVersion = fromVersion;
+      this.toVersion = toVersion;
+      updatedCatalogObjects = new HashSet<>();
+      serializer = new TSerializer(new TBinaryProtocol.Factory());
+    }
+
+    void addCatalogObject(TCatalogObject obj, boolean delete) throws TException {
+      String key = Catalog.toCatalogObjectKey(obj);
+      if (obj.type != TCatalogObjectType.CATALOG) {
+        topicUpdateLog_.add(key,
+            new TopicUpdateLog.Entry(0, obj.getCatalog_version(), toVersion));
+        if (!delete) updatedCatalogObjects.add(key);
+      }
+      // TODO: TSerializer.serialize() returns a copy of the internal byte array, which
+      // could be elided.
+      byte[] data = serializer.serialize(obj);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Collected catalog " + (delete ? "deletion: " : "update: ") + key +
+            " version: " + obj.catalog_version);
+      }
+      if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, key, data, delete))
+      {
+        LOG.error("NativeAddPendingTopicItem failed in BE. key=" + key + ", delete="
+            + delete + ", data_size=" + data.length);
+      }
+    }
+  }
 
+  /**
+   * Identifies the catalog objects that were added/modified/deleted in the catalog with
+   * versions > 'fromVersion'. It operates on a snaphsot of the catalog without holding
+   * the catalog lock which means that other concurrent metadata operations can still make
+   * progress while the catalog delta is computed. An entry in the topic update log is
+   * added for every catalog object that is included in the catalog delta. The log is
+   * examined by operations using SYNC_DDL to determine which topic update covers the
+   * result set of metadata operation. Once the catalog delta is computed, the entries in
+   * the delete log with versions less than 'fromVersion' are garbage collected.
+   * The catalog delta is passed to the backend by calling NativeAddPendingTopicItem().
+   */
+  public long getCatalogDelta(long nativeCatalogServerPtr, long fromVersion) throws
+      TException {
+    GetCatalogDeltaContext ctx = new GetCatalogDeltaContext(nativeCatalogServerPtr,
+        fromVersion, getCatalogVersion());
     for (Db db: getAllDbs()) {
-      addDatabaseToCatalogDelta(db, fromVersion, toVersion, resp);
+      addDatabaseToCatalogDelta(db, ctx);
     }
     for (DataSource dataSource: getAllDataSources()) {
-      addDataSourceToCatalogDelta(dataSource, fromVersion, toVersion, resp);
+      addDataSourceToCatalogDelta(dataSource, ctx);
     }
     for (HdfsCachePool cachePool: getAllHdfsCachePools()) {
-      addHdfsCachePoolToCatalogDelta(cachePool, fromVersion, toVersion, resp);
+      addHdfsCachePoolToCatalogDelta(cachePool, ctx);
     }
     for (Role role: getAllRoles()) {
-      addRoleToCatalogDelta(role, fromVersion, toVersion, resp);
+      addRoleToCatalogDelta(role, ctx);
     }
-    Set<String> updatedCatalogObjects = Sets.newHashSet();
-    for (TCatalogObject catalogObj: resp.updated_objects) {
-      topicUpdateLog_.add(Catalog.toCatalogObjectKey(catalogObj),
-          new TopicUpdateLog.Entry(0, catalogObj.getCatalog_version(),
-              toVersion));
-      updatedCatalogObjects.add(Catalog.toCatalogObjectKey(catalogObj));
-    }
-
     // Identify the catalog objects that were removed from the catalog for which their
-    // versions are in range ('fromVersion', 'toVersion']. We need to make sure
+    // versions are in range ('ctx.fromVersion', 'ctx.toVersion']. We need to make sure
     // that we don't include "deleted" objects that were re-added to the catalog.
-    for (TCatalogObject removedObject: getDeletedObjects(fromVersion, toVersion)) {
-      if (!updatedCatalogObjects.contains(
+    for (TCatalogObject removedObject:
+        getDeletedObjects(ctx.fromVersion, ctx.toVersion)) {
+      if (!ctx.updatedCatalogObjects.contains(
           Catalog.toCatalogObjectKey(removedObject))) {
-        topicUpdateLog_.add(Catalog.toCatalogObjectKey(removedObject),
-            new TopicUpdateLog.Entry(0, removedObject.getCatalog_version(),
-                toVersion));
-        resp.addToDeleted_objects(removedObject);
+        ctx.addCatalogObject(removedObject, true);
       }
     }
     // Each topic update should contain a single "TCatalog" object which is used to
@@ -443,20 +477,21 @@ public class CatalogServiceCatalog extends Catalog {
     // version at this point, it ensures impalads will always bump their versions,
     // even in the case where an object has been dropped.
     TCatalogObject catalog =
-        new TCatalogObject(TCatalogObjectType.CATALOG, toVersion);
+        new TCatalogObject(TCatalogObjectType.CATALOG, ctx.toVersion);
     catalog.setCatalog(new TCatalog(catalogServiceId_));
-    resp.addToUpdated_objects(catalog);
+    ctx.addCatalogObject(catalog, false);
     // Garbage collect the delete and topic update log.
-    deleteLog_.garbageCollect(toVersion);
-    topicUpdateLog_.garbageCollectUpdateLogEntries(toVersion);
-    lastSentTopicUpdate_.set(toVersion);
+    deleteLog_.garbageCollect(ctx.toVersion);
+    topicUpdateLog_.garbageCollectUpdateLogEntries(ctx.toVersion);
+    lastSentTopicUpdate_.set(ctx.toVersion);
     // Notify any operation that is waiting on the next topic update.
     synchronized (topicUpdateLog_) {
       topicUpdateLog_.notifyAll();
     }
-    return resp;
+    return ctx.toVersion;
   }
 
+
   /**
    * Get a snapshot view of all the catalog objects that were deleted between versions
    * ('fromVersion', 'toVersion'].
@@ -520,23 +555,23 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Adds a database in the topic update if its version is in the range
-   * ('fromVersion', 'toVersion']. It iterates through all the tables and functions of
-   * this database to determine if they can be included in the topic update.
+   * ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the tables and
+   * functions of this database to determine if they can be included in the topic update.
    */
-  private void addDatabaseToCatalogDelta(Db db, long fromVersion, long toVersion,
-      TGetCatalogDeltaResponse resp) {
+  private void addDatabaseToCatalogDelta(Db db, GetCatalogDeltaContext ctx)
+      throws TException {
     long dbVersion = db.getCatalogVersion();
-    if (dbVersion > fromVersion && dbVersion <= toVersion) {
+    if (dbVersion > ctx.fromVersion && dbVersion <= ctx.toVersion) {
       TCatalogObject catalogDb =
           new TCatalogObject(TCatalogObjectType.DATABASE, dbVersion);
       catalogDb.setDb(db.toThrift());
-      resp.addToUpdated_objects(catalogDb);
+      ctx.addCatalogObject(catalogDb, false);
     }
     for (Table tbl: getAllTables(db)) {
-      addTableToCatalogDelta(tbl, fromVersion, toVersion, resp);
+      addTableToCatalogDelta(tbl, ctx);
     }
     for (Function fn: getAllFunctions(db)) {
-      addFunctionToCatalogDelta(fn, fromVersion, toVersion, resp);
+      addFunctionToCatalogDelta(fn, ctx);
     }
   }
 
@@ -568,25 +603,25 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Adds a table in the topic update if its version is in the range
-   * ('fromVersion', 'toVersion']. If the table's version is larger than 'toVersion' and
-   * the table has skipped a topic update 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is
-   * included in the topic update. This prevents tables that are updated frequently from
-   * skipping topic updates indefinitely, which would also violate the semantics of
-   * SYNC_DDL.
+   * ('ctx.fromVersion', 'ctx.toVersion']. If the table's version is larger than
+   * 'ctx.toVersion' and the table has skipped a topic update
+   * 'MAX_NUM_SKIPPED_TOPIC_UPDATES' times, it is included in the topic update. This
+   * prevents tables that are updated frequently from skipping topic updates indefinitely,
+   * which would also violate the semantics of SYNC_DDL.
    */
-  private void addTableToCatalogDelta(Table tbl, long fromVersion, long toVersion,
-      TGetCatalogDeltaResponse resp) {
-    if (tbl.getCatalogVersion() <= toVersion) {
-      addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+  private void addTableToCatalogDelta(Table tbl, GetCatalogDeltaContext ctx)
+      throws TException  {
+    if (tbl.getCatalogVersion() <= ctx.toVersion) {
+      addTableToCatalogDeltaHelper(tbl, ctx);
     } else {
       TopicUpdateLog.Entry topicUpdateEntry =
           topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
       Preconditions.checkNotNull(topicUpdateEntry);
-      if (topicUpdateEntry.getNumSkippedTopicUpdates() >= MAX_NUM_SKIPPED_TOPIC_UPDATES) {
-        addTableToCatalogDeltaHelper(tbl, fromVersion, toVersion, resp);
+      if (topicUpdateEntry.getNumSkippedTopicUpdates() == MAX_NUM_SKIPPED_TOPIC_UPDATES) {
+        addTableToCatalogDeltaHelper(tbl, ctx);
       } else {
         LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
-            toVersion);
+            ctx.toVersion);
         topicUpdateLog_.add(tbl.getUniqueName(),
             new TopicUpdateLog.Entry(
                 topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
@@ -598,23 +633,23 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Helper function that tries to add a table in a topic update. It acquires table's
-   * lock and checks if its version is in the ('fromVersion', 'toVersion'] range and how
-   * many consecutive times (if any) has the table skipped a topic update.
+   * lock and checks if its version is in the ('ctx.fromVersion', 'ctx.toVersion'] range
+   * and how many consecutive times (if any) has the table skipped a topic update.
    */
-  private void addTableToCatalogDeltaHelper(Table tbl, long fromVersion, long toVersion,
-      TGetCatalogDeltaResponse resp) {
+  private void addTableToCatalogDeltaHelper(Table tbl, GetCatalogDeltaContext ctx)
+      throws TException {
     TCatalogObject catalogTbl =
         new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
     tbl.getLock().lock();
     try {
       long tblVersion = tbl.getCatalogVersion();
-      if (tblVersion <= fromVersion) return;
+      if (tblVersion <= ctx.fromVersion) return;
       TopicUpdateLog.Entry topicUpdateEntry =
           topicUpdateLog_.getOrCreateLogEntry(tbl.getUniqueName());
-      if (tblVersion > toVersion &&
+      if (tblVersion > ctx.toVersion &&
           topicUpdateEntry.getNumSkippedTopicUpdates() < MAX_NUM_SKIPPED_TOPIC_UPDATES) {
         LOG.info("Table " + tbl.getFullName() + " is skipping topic update " +
-            toVersion);
+            ctx.toVersion);
         topicUpdateLog_.add(tbl.getUniqueName(),
             new TopicUpdateLog.Entry(
                 topicUpdateEntry.getNumSkippedTopicUpdates() + 1,
@@ -630,7 +665,7 @@ public class CatalogServiceCatalog extends Catalog {
         return;
       }
       catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-      resp.addToUpdated_objects(catalogTbl);
+      ctx.addCatalogObject(catalogTbl, false);
     } finally {
       tbl.getLock().unlock();
     }
@@ -638,65 +673,65 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Adds a function to the topic update if its version is in the range
-   * ('fromVersion', 'toVersion'].
+   * ('ctx.fromVersion', 'ctx.toVersion'].
    */
-  private void addFunctionToCatalogDelta(Function fn, long fromVersion, long toVersion,
-      TGetCatalogDeltaResponse resp) {
+  private void addFunctionToCatalogDelta(Function fn, GetCatalogDeltaContext ctx)
+      throws TException {
     long fnVersion = fn.getCatalogVersion();
-    if (fnVersion <= fromVersion || fnVersion > toVersion) return;
+    if (fnVersion <= ctx.fromVersion || fnVersion > ctx.toVersion) return;
     TCatalogObject function =
         new TCatalogObject(TCatalogObjectType.FUNCTION, fnVersion);
     function.setFn(fn.toThrift());
-    resp.addToUpdated_objects(function);
+    ctx.addCatalogObject(function, false);
   }
 
   /**
    * Adds a data source to the topic update if its version is in the range
-   * ('fromVersion', 'toVersion'].
+   * ('ctx.fromVersion', 'ctx.toVersion'].
    */
-  private void addDataSourceToCatalogDelta(DataSource dataSource, long fromVersion,
-      long toVersion, TGetCatalogDeltaResponse resp) {
+  private void addDataSourceToCatalogDelta(DataSource dataSource,
+      GetCatalogDeltaContext ctx) throws TException  {
     long dsVersion = dataSource.getCatalogVersion();
-    if (dsVersion <= fromVersion || dsVersion > toVersion) return;
+    if (dsVersion <= ctx.fromVersion || dsVersion > ctx.toVersion) return;
     TCatalogObject catalogObj =
         new TCatalogObject(TCatalogObjectType.DATA_SOURCE, dsVersion);
     catalogObj.setData_source(dataSource.toThrift());
-    resp.addToUpdated_objects(catalogObj);
+    ctx.addCatalogObject(catalogObj, false);
   }
 
   /**
    * Adds a HDFS cache pool to the topic update if its version is in the range
-   * ('fromVersion', 'toVersion'].
+   * ('ctx.fromVersion', 'ctx.toVersion'].
    */
-  private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool, long fromVersion,
-      long toVersion, TGetCatalogDeltaResponse resp) {
+  private void addHdfsCachePoolToCatalogDelta(HdfsCachePool cachePool,
+      GetCatalogDeltaContext ctx) throws TException  {
     long cpVersion = cachePool.getCatalogVersion();
-    if (cpVersion <= fromVersion || cpVersion > toVersion) {
+    if (cpVersion <= ctx.fromVersion || cpVersion > ctx.toVersion) {
       return;
     }
     TCatalogObject pool =
         new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL, cpVersion);
     pool.setCache_pool(cachePool.toThrift());
-    resp.addToUpdated_objects(pool);
+    ctx.addCatalogObject(pool, false);
   }
 
 
   /**
    * Adds a role to the topic update if its version is in the range
-   * ('fromVersion', 'toVersion']. It iterates through all the privileges of this role to
-   * determine if they can be inserted in the topic update.
+   * ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the privileges of
+   * this role to determine if they can be inserted in the topic update.
    */
-  private void addRoleToCatalogDelta(Role role, long fromVersion, long toVersion,
-      TGetCatalogDeltaResponse resp) {
+  private void addRoleToCatalogDelta(Role role, GetCatalogDeltaContext ctx)
+      throws TException  {
     long roleVersion = role.getCatalogVersion();
-    if (roleVersion > fromVersion && roleVersion <= toVersion) {
+    if (roleVersion > ctx.fromVersion && roleVersion <= ctx.toVersion) {
       TCatalogObject thriftRole =
           new TCatalogObject(TCatalogObjectType.ROLE, roleVersion);
       thriftRole.setRole(role.toThrift());
-      resp.addToUpdated_objects(thriftRole);
+      ctx.addCatalogObject(thriftRole, false);
     }
     for (RolePrivilege p: getAllPrivileges(role)) {
-      addRolePrivilegeToCatalogDelta(p, fromVersion, toVersion, resp);
+      addRolePrivilegeToCatalogDelta(p, ctx);
     }
   }
 
@@ -715,16 +750,16 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Adds a role privilege to the topic update if its version is in the range
-   * ('fromVersion', 'toVersion'].
+   * ('ctx.fromVersion', 'ctx.toVersion'].
    */
-  private void addRolePrivilegeToCatalogDelta(RolePrivilege priv, long fromVersion,
-      long toVersion, TGetCatalogDeltaResponse resp) {
+  private void addRolePrivilegeToCatalogDelta(RolePrivilege priv,
+      GetCatalogDeltaContext ctx) throws TException  {
     long privVersion = priv.getCatalogVersion();
-    if (privVersion <= fromVersion || privVersion > toVersion) return;
+    if (privVersion <= ctx.fromVersion || privVersion > ctx.toVersion) return;
     TCatalogObject privilege =
         new TCatalogObject(TCatalogObjectType.PRIVILEGE, privVersion);
     privilege.setPrivilege(priv.toThrift());
-    resp.addToUpdated_objects(privilege);
+    ctx.addCatalogObject(privilege, false);
   }
 
   /**