You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2017/09/26 21:50:21 UTC

[1/3] incubator-impala git commit: IMPALA-3360: Codegen inserting into runtime filters

Repository: incubator-impala
Updated Branches:
  refs/heads/master 646920810 -> dd340b881


IMPALA-3360: Codegen inserting into runtime filters

This patch codegens PhjBuilder::InsertRuntimeFilters() and
FilterContext::Insert().

This allows us to unroll the loop over all the filters in
PhjBuilder::ProcessBuildBatch(), eliminate the branch on type that
happens in RawValue::GetHashValue(), and eliminate the AVX check
that happens in BloomFilter::Insert().

Testing:
- Ran existing runtime filter tests.
- Ran perf tests locally (all avg. over three runs):
  - Four way self join on tpch_parquet.lineitem. Should be a good case
    for this as there's several large hash join build sides that will
    benefit from the codegen. Total query running time improved ~7%
    (from 16.07s to 14.91s).
  - Single join of tpch_parquet.lineitem against a selectively
    filtered tpch_parquet.lineitem. Should be a bad case for this
    patch, as the build side of the join is very small. Total query
    running time regressed by about ~2% (from 0.73s to 0.75s) due to
    an increase in codegen time (from 295ms to 309ms for the fragment
    containing the hash join).

Change-Id: I79cf23ad92dadaab996a50a2ca07ef9ebe8639bb
Reviewed-on: http://gerrit.cloudera.org:8080/8029
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/79dc220b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/79dc220b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/79dc220b

Branch: refs/heads/master
Commit: 79dc220bd75eb5dc333aeeff3f65fc5dbfe3a6e8
Parents: 6469208
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Sep 6 12:29:38 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Sep 25 19:37:20 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |   6 +-
 be/src/codegen/impala-ir.cc                     |   1 +
 be/src/exec/filter-context.cc                   | 162 +++++++++++++++++++
 be/src/exec/filter-context.h                    |   9 +-
 be/src/exec/partitioned-hash-join-builder-ir.cc |   3 +-
 be/src/exec/partitioned-hash-join-builder.cc    |  85 +++++++++-
 be/src/exec/partitioned-hash-join-builder.h     |  15 +-
 be/src/util/CMakeLists.txt                      |   1 +
 be/src/util/bloom-filter-ir.cc                  |  32 ++++
 be/src/util/bloom-filter.h                      |   6 +
 10 files changed, 311 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 75d233c..5e1ce43 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -193,6 +193,8 @@ ir_functions = [
   ["GENERIC_IS_NULL_STRING", "IrGenericIsNullString"],
   ["RAW_VALUE_COMPARE",
    "_ZN6impala8RawValue7CompareEPKvS2_RKNS_10ColumnTypeE"],
+  ["RAW_VALUE_GET_HASH_VALUE",
+   "_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj"],
   ["TOPN_NODE_INSERT_BATCH",
    "_ZN6impala8TopNNode11InsertBatchEPNS_8RowBatchE"],
   ["MEMPOOL_ALLOCATE",
@@ -202,7 +204,9 @@ ir_functions = [
   ["RUNTIME_FILTER_EVAL",
    "_ZNK6impala13RuntimeFilter4EvalEPvRKNS_10ColumnTypeE"],
   ["UNION_MATERIALIZE_BATCH",
-  "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"]
+  "_ZN6impala9UnionNode16MaterializeBatchEPNS_8RowBatchEPPh"],
+  ["BLOOM_FILTER_INSERT_NO_AVX2", "_ZN6impala11BloomFilter12InsertNoAvx2Ej"],
+  ["BLOOM_FILTER_INSERT_AVX2", "_ZN6impala11BloomFilter10InsertAvx2Ej"]
 ]
 
 enums_preamble = '\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 2992849..24e0ce7 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -57,6 +57,7 @@
 #include "runtime/raw-value-ir.cc"
 #include "runtime/runtime-filter-ir.cc"
 #include "udf/udf-ir.cc"
+#include "util/bloom-filter-ir.cc"
 #include "util/hash-util-ir.cc"
 
 #pragma clang diagnostic pop

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 0eee704..ecf744a 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -214,3 +214,165 @@ Status FilterContext::CodegenEval(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
   return Status::OK();
 }
 
+// An example of the generated code for TPCH-Q2: RF002 -> n_regionkey
+//
+// @expr_type_arg = constant %"struct.impala::ColumnType" { i32 4, i32 -1, i32 -1,
+//     i32 -1, %"class.std::vector.422" zeroinitializer,
+//     %"class.std::vector.101" zeroinitializer }
+//
+// define void @FilterContextInsert(%"struct.impala::FilterContext"* %this,
+//     %"class.impala::TupleRow"* %row) #43 {
+// entry:
+//   %0 = alloca i16
+//   %local_bloom_filter_ptr = getelementptr inbounds %"struct.impala::FilterContext",
+//       %"struct.impala::FilterContext"* %this, i32 0, i32 3
+//   %local_bloom_filter_arg = load %"class.impala::BloomFilter"*,
+//       %"class.impala::BloomFilter"** %local_bloom_filter_ptr
+//   %bloom_is_null = icmp eq %"class.impala::BloomFilter"* %local_bloom_filter_arg, null
+//   br i1 %bloom_is_null, label %bloom_is_null1, label %bloom_not_null
+//
+// bloom_not_null:                                   ; preds = %entry
+//   %expr_eval_ptr = getelementptr inbounds %"struct.impala::FilterContext",
+//       %"struct.impala::FilterContext"* %this, i32 0, i32 0
+//   %expr_eval_arg = load %"class.impala::ScalarExprEvaluator"*,
+//       %"class.impala::ScalarExprEvaluator"** %expr_eval_ptr
+//   %result = call i32 @GetSlotRef.46(
+//       %"class.impala::ScalarExprEvaluator"* %expr_eval_arg,
+//       %"class.impala::TupleRow"* %row)
+//   %is_null = trunc i32 %result to i1
+//   br i1 %is_null, label %val_is_null, label %val_not_null
+//
+// bloom_is_null1:                                   ; preds = %entry
+//   ret void
+//
+// val_not_null:                                     ; preds = %bloom_not_null
+//   %1 = ashr i32 %result, 16
+//   %2 = trunc i32 %1 to i16
+//   store i16 %2, i16* %0
+//   %native_ptr = bitcast i16* %0 to i8*
+//   br label %insert_filter
+//
+// val_is_null:                                      ; preds = %bloom_not_null
+//   br label %insert_filter
+//
+// insert_filter:                                    ; preds = %val_not_null, %val_is_null
+//   %val_ptr_phi = phi i8* [ %native_ptr, %val_not_null ], [ null, %val_is_null ]
+//   %hash_value = call i32 @_ZN6impala8RawValue12GetHashValueEPKvRKNS_10ColumnTypeEj(
+//       i8* %val_ptr_phi, %"struct.impala::ColumnType"* @expr_type_arg, i32 1234)
+//   call void @_ZN6impala11BloomFilter9InsertAvxEj(
+//       %"class.impala::BloomFilter"* %local_bloom_filter_arg, i32 %hash_value)
+//   ret void
+// }
+Status FilterContext::CodegenInsert(
+    LlvmCodeGen* codegen, ScalarExpr* filter_expr, Function** fn) {
+  LLVMContext& context = codegen->context();
+  LlvmBuilder builder(context);
+
+  *fn = nullptr;
+  PointerType* this_type = codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  LlvmCodeGen::FnPrototype prototype(
+      codegen, "FilterContextInsert", codegen->void_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+
+  Value* args[2];
+  Function* insert_filter_fn = prototype.GeneratePrototype(&builder, args);
+  Value* this_arg = args[0];
+  Value* row_arg = args[1];
+
+  // Load 'local_bloom_filter' from 'this_arg' FilterContext object.
+  Value* local_bloom_filter_ptr =
+      builder.CreateStructGEP(nullptr, this_arg, 3, "local_bloom_filter_ptr");
+  Value* local_bloom_filter_arg =
+      builder.CreateLoad(local_bloom_filter_ptr, "local_bloom_filter_arg");
+
+  // Check if 'local_bloom_filter' is NULL and return if so.
+  Value* bloom_is_null = builder.CreateIsNull(local_bloom_filter_arg, "bloom_is_null");
+  BasicBlock* bloom_not_null_block =
+      BasicBlock::Create(context, "bloom_not_null", insert_filter_fn);
+  BasicBlock* bloom_is_null_block =
+      BasicBlock::Create(context, "bloom_is_null", insert_filter_fn);
+  builder.CreateCondBr(bloom_is_null, bloom_is_null_block, bloom_not_null_block);
+  builder.SetInsertPoint(bloom_is_null_block);
+  builder.CreateRetVoid();
+  builder.SetInsertPoint(bloom_not_null_block);
+
+  BasicBlock* val_not_null_block =
+      BasicBlock::Create(context, "val_not_null", insert_filter_fn);
+  BasicBlock* val_is_null_block =
+      BasicBlock::Create(context, "val_is_null", insert_filter_fn);
+  BasicBlock* insert_filter_block =
+      BasicBlock::Create(context, "insert_filter", insert_filter_fn);
+
+  Function* compute_fn;
+  RETURN_IF_ERROR(filter_expr->GetCodegendComputeFn(codegen, &compute_fn));
+  DCHECK(compute_fn != nullptr);
+
+  // Load 'expr_eval' from 'this_arg' FilterContext object.
+  Value* expr_eval_ptr = builder.CreateStructGEP(nullptr, this_arg, 0, "expr_eval_ptr");
+  Value* expr_eval_arg = builder.CreateLoad(expr_eval_ptr, "expr_eval_arg");
+
+  // Evaluate the row against the filter's expression.
+  Value* compute_fn_args[] = {expr_eval_arg, row_arg};
+  CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
+      codegen, &builder, filter_expr->type(), compute_fn, compute_fn_args, "result");
+
+  // Check if the result is NULL
+  Value* val_is_null = result.GetIsNull();
+  builder.CreateCondBr(val_is_null, val_is_null_block, val_not_null_block);
+
+  // Set the pointer to NULL in case it evaluates to NULL.
+  builder.SetInsertPoint(val_is_null_block);
+  Value* null_ptr = codegen->null_ptr_value();
+  builder.CreateBr(insert_filter_block);
+
+  // Saves 'result' on the stack and passes a pointer to it to 'insert_bloom_filter_fn'.
+  builder.SetInsertPoint(val_not_null_block);
+  Value* native_ptr = result.ToNativePtr();
+  native_ptr = builder.CreatePointerCast(native_ptr, codegen->ptr_type(), "native_ptr");
+  builder.CreateBr(insert_filter_block);
+
+  // Get the arguments in place to call 'get_hash_value_fn'.
+  builder.SetInsertPoint(insert_filter_block);
+  PHINode* val_ptr_phi = builder.CreatePHI(codegen->ptr_type(), 2, "val_ptr_phi");
+  val_ptr_phi->addIncoming(native_ptr, val_not_null_block);
+  val_ptr_phi->addIncoming(null_ptr, val_is_null_block);
+
+  // Create a global constant of the filter expression's ColumnType. It needs to be a
+  // constant for constant propagation and dead code elimination in 'get_hash_value_fn'.
+  Type* col_type = codegen->GetType(ColumnType::LLVM_CLASS_NAME);
+  Constant* expr_type_arg = codegen->ConstantToGVPtr(
+      col_type, filter_expr->type().ToIR(codegen), "expr_type_arg");
+
+  // Call RawValue::GetHashValue() on the result of the filter's expression.
+  Value* seed_arg =
+      codegen->GetIntConstant(TYPE_INT, RuntimeFilterBank::DefaultHashSeed());
+  Value* get_hash_value_args[] = {val_ptr_phi, expr_type_arg, seed_arg};
+  Function* get_hash_value_fn =
+      codegen->GetFunction(IRFunction::RAW_VALUE_GET_HASH_VALUE, false);
+  DCHECK(get_hash_value_fn != nullptr);
+  Value* hash_value =
+      builder.CreateCall(get_hash_value_fn, get_hash_value_args, "hash_value");
+
+  // Call Insert() on the bloom filter.
+  Value* insert_args[] = {local_bloom_filter_arg, hash_value};
+  Function* insert_bloom_filter_fn;
+  if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
+    insert_bloom_filter_fn =
+        codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_AVX2, false);
+  } else {
+    insert_bloom_filter_fn =
+        codegen->GetFunction(IRFunction::BLOOM_FILTER_INSERT_NO_AVX2, false);
+  }
+
+  DCHECK(insert_bloom_filter_fn != nullptr);
+  builder.CreateCall(insert_bloom_filter_fn, insert_args);
+  builder.CreateRetVoid();
+
+  *fn = codegen->FinalizeFunction(insert_filter_fn);
+  if (*fn == NULL) {
+    return Status("Codegen'ed FilterContext::Insert() fails verification, see log");
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/exec/filter-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index fa95b91..81a3889 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -115,7 +115,14 @@ struct FilterContext {
   /// argument to RuntimeFilter::Eval() with a constant. On success, 'fn' is set to
   /// the generated function. On failure, an error status is returned.
   static Status CodegenEval(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
-     llvm::Function** fn) WARN_UNUSED_RESULT;
+      llvm::Function** fn) WARN_UNUSED_RESULT;
+
+  /// Codegen Insert() by codegen'ing the expression 'filter_expr', replacing the type
+  /// argument to RawValue::GetHashValue() with a constant, and calling into the correct
+  /// version of BloomFilter::Insert(), depending on the presence of AVX.  On success,
+  /// 'fn' is set to the generated function. On failure, an error status is returned.
+  static Status CodegenInsert(LlvmCodeGen* codegen, ScalarExpr* filter_expr,
+      llvm::Function** fn) WARN_UNUSED_RESULT;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index b9c2cc3..8481212 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -57,8 +57,7 @@ Status PhjBuilder::ProcessBuildBatch(
     if (build_filters) {
       DCHECK_EQ(ctx->level(), 0)
           << "Runtime filters should not be built during repartitioning.";
-      // TODO: unroll loop and codegen expr evaluation and hashing (IMPALA-3360).
-      for (const FilterContext& ctx : filter_ctxs_) ctx.Insert(build_row);
+      InsertRuntimeFilters(build_row);
     }
     const uint32_t hash = expr_vals_cache->CurExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index d292c59..6b7b791 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -48,10 +48,14 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
 using namespace impala;
 using llvm::ConstantInt;
 using llvm::Function;
+using llvm::LLVMContext;
+using llvm::PointerType;
 using llvm::Type;
 using llvm::Value;
 using strings::Substitute;
 
+const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
+
 PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
     const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
     RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
@@ -491,6 +495,10 @@ void PhjBuilder::AllocateRuntimeFilters() {
   }
 }
 
+void PhjBuilder::InsertRuntimeFilters(TupleRow* build_row) noexcept {
+  for (const FilterContext& ctx : filter_ctxs_) ctx.Insert(build_row);
+}
+
 void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
   int32_t num_enabled_filters = 0;
   // Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish
@@ -761,10 +769,14 @@ void PhjBuilder::Codegen(LlvmCodeGen* codegen) {
   Function* eval_build_row_fn;
   codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(codegen, true, &eval_build_row_fn));
 
+  Function* insert_filters_fn;
+  codegen_status.MergeStatus(
+      CodegenInsertRuntimeFilters(codegen, filter_exprs_, &insert_filters_fn));
+
   if (codegen_status.ok()) {
     TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
-    build_codegen_status =
-        CodegenProcessBuildBatch(codegen, hash_fn, murmur_hash_fn, eval_build_row_fn);
+    build_codegen_status = CodegenProcessBuildBatch(
+        codegen, hash_fn, murmur_hash_fn, eval_build_row_fn, insert_filters_fn);
     insert_codegen_status = CodegenInsertBatch(codegen, hash_fn, murmur_hash_fn,
         eval_build_row_fn, prefetch_mode);
   } else {
@@ -788,8 +800,8 @@ string PhjBuilder::DebugString() const {
   return ss.str();
 }
 
-Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
-    Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
+Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen, Function* hash_fn,
+    Function* murmur_hash_fn, Function* eval_row_fn, Function* insert_filters_fn) {
   Function* process_build_batch_fn =
       codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
   DCHECK(process_build_batch_fn != NULL);
@@ -799,6 +811,10 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
       codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, "EvalBuildRow");
   DCHECK_EQ(replaced, 1);
 
+  replaced = codegen->ReplaceCallSites(
+      process_build_batch_fn, insert_filters_fn, "InsertRuntimeFilters");
+  DCHECK_EQ(replaced, 1);
+
   // Replace some hash table parameters with constants.
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
@@ -924,3 +940,64 @@ Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, Function* hash_fn,
       insert_batch_fn_level0, reinterpret_cast<void**>(&insert_batch_fn_level0_));
   return Status::OK();
 }
+
+// An example of the generated code for a query with two filters built by this node.
+//
+// ; Function Attrs: noinline
+// define void @InsertRuntimeFilters(%"class.impala::PhjBuilder"* %this,
+//     %"class.impala::TupleRow"* %row) #46 {
+// entry:
+//   call void @FilterContextInsert(%"struct.impala::FilterContext"* inttoptr (
+//       i64 197870464 to %"struct.impala::FilterContext"*),
+//       %"class.impala::TupleRow"* %row)
+//   call void @FilterContextInsert.14(%"struct.impala::FilterContext"* inttoptr (
+//       i64 197870496 to %"struct.impala::FilterContext"*),
+//       %"class.impala::TupleRow"* %row)
+//   ret void
+// }
+Status PhjBuilder::CodegenInsertRuntimeFilters(
+    LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, Function** fn) {
+  LLVMContext& context = codegen->context();
+  LlvmBuilder builder(context);
+
+  *fn = nullptr;
+  Type* this_type = codegen->GetPtrType(PhjBuilder::LLVM_CLASS_NAME);
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
+  LlvmCodeGen::FnPrototype prototype(
+      codegen, "InsertRuntimeFilters", codegen->void_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+
+  Value* args[2];
+  Function* insert_runtime_filters_fn = prototype.GeneratePrototype(&builder, args);
+  Value* row_arg = args[1];
+
+  int num_filters = filter_exprs.size();
+  for (int i = 0; i < num_filters; ++i) {
+    Function* insert_fn;
+    RETURN_IF_ERROR(FilterContext::CodegenInsert(codegen, filter_exprs_[i], &insert_fn));
+    PointerType* filter_context_type =
+        codegen->GetPtrType(FilterContext::LLVM_CLASS_NAME);
+    Value* filter_context_ptr =
+        codegen->CastPtrToLlvmPtr(filter_context_type, &filter_ctxs_[i]);
+
+    Value* insert_args[] = {filter_context_ptr, row_arg};
+    builder.CreateCall(insert_fn, insert_args);
+  }
+
+  builder.CreateRetVoid();
+
+  if (num_filters > 0) {
+    // Don't inline this function to avoid code bloat in ProcessBuildBatch().
+    // If there is any filter, InsertRuntimeFilters() is large enough to not benefit
+    // much from inlining.
+    insert_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline);
+  }
+
+  *fn = codegen->FinalizeFunction(insert_runtime_filters_fn);
+  if (*fn == nullptr) {
+    return Status("Codegen'd PhjBuilder::InsertRuntimeFilters() failed "
+                  "verification, see log");
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 2c52988..c123a9b 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -348,6 +348,10 @@ class PhjBuilder : public DataSink {
   /// phase.
   void AllocateRuntimeFilters();
 
+  /// Iterates over the runtime filters in filters_ and inserts each row into each filter.
+  /// This is replaced at runtime with code generated by CodegenInsertRuntimeFilters().
+  void InsertRuntimeFilters(TupleRow* build_row) noexcept;
+
   /// Publish the runtime filters to the fragment-local RuntimeFilterBank.
   /// 'num_build_rows' is used to determine whether the computed filters have an
   /// unacceptably high false-positive rate.
@@ -356,7 +360,8 @@ class PhjBuilder : public DataSink {
   /// Codegen processing build batches. Identical signature to ProcessBuildBatch().
   /// Returns non-OK status if codegen was not possible.
   Status CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn) WARN_UNUSED_RESULT;
+      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
+      llvm::Function* insert_filters_fn) WARN_UNUSED_RESULT;
 
   /// Codegen inserting batches into a partition's hash table. Identical signature to
   /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
@@ -364,6 +369,11 @@ class PhjBuilder : public DataSink {
       llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
       TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
 
+  /// Codegen inserting rows into runtime filters. Identical signature to
+  /// InsertRuntimeFilters(). Returns non-OK if codegen was not possible.
+  Status CodegenInsertRuntimeFilters(
+      LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
+
   RuntimeState* const runtime_state_;
 
   // The ID of the plan join node this is associated with.
@@ -503,6 +513,9 @@ class PhjBuilder : public DataSink {
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
   InsertBatchFn insert_batch_fn_;
   InsertBatchFn insert_batch_fn_level0_;
+
+  /// Class name in LLVM IR.
+  static const char* LLVM_CLASS_NAME;
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 947ab78..4ff03d6 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -32,6 +32,7 @@ add_library(Util
   bitmap.cc
   bit-util.cc
   bloom-filter.cc
+  bloom-filter-ir.cc
   coding-util.cc
   codec.cc
   common-metrics.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/util/bloom-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter-ir.cc b/be/src/util/bloom-filter-ir.cc
new file mode 100644
index 0000000..4c56149
--- /dev/null
+++ b/be/src/util/bloom-filter-ir.cc
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/bloom-filter.h"
+
+#include "codegen/impala-ir.h"
+
+using namespace impala;
+
+void BloomFilter::InsertNoAvx2(const uint32_t hash) noexcept {
+  const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
+  BucketInsert(bucket_idx, hash);
+}
+
+void BloomFilter::InsertAvx2(const uint32_t hash) noexcept {
+  const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
+  BucketInsertAVX2(bucket_idx, hash);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79dc220b/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 913b331..4a50cfc 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -131,6 +131,12 @@ class BloomFilter {
 
   Bucket* directory_;
 
+  // Same as Insert(), but skips the CPU check and assumes that AVX is not available.
+  void InsertNoAvx2(const uint32_t hash) noexcept;
+
+  // Same as Insert(), but skips the CPU check and assumes that AVX is available.
+  void InsertAvx2(const uint32_t hash) noexcept;
+
   /// Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
   /// into and 'hash' is the value passed to Insert().
   void BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept;


[2/3] incubator-impala git commit: [DOCS] Fill in release note subtopics for Apache Impala 2.10

Posted by jr...@apache.org.
[DOCS] Fill in release note subtopics for Apache Impala 2.10

Primarily just pointing to the list of issues in the changelog.
Those cover the different use cases for the different parts
of the release notes -- fixed issues, new features, and
incompatible changes.

Change-Id: Ide38c1e1c64dac287b180b39ebb8e7735d457ce3
Reviewed-on: http://gerrit.cloudera.org:8080/7958
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d565386b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d565386b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d565386b

Branch: refs/heads/master
Commit: d565386b245128cedaa07367a2ff8407988aeaa2
Parents: 79dc220
Author: John Russell <jr...@cloudera.com>
Authored: Tue Sep 5 11:44:10 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Sep 26 18:44:19 2017 +0000

----------------------------------------------------------------------
 docs/impala_keydefs.ditamap                 |  4 ++++
 docs/topics/impala_fixed_issues.xml         | 16 ++++++++++++++++
 docs/topics/impala_incompatible_changes.xml | 16 ++++++++++++++++
 docs/topics/impala_new_features.xml         | 19 ++++++++++++++++++-
 4 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d565386b/docs/impala_keydefs.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala_keydefs.ditamap b/docs/impala_keydefs.ditamap
index 518afef..3c47329 100644
--- a/docs/impala_keydefs.ditamap
+++ b/docs/impala_keydefs.ditamap
@@ -10516,6 +10516,7 @@ under the License.
   <keydef href="https://issues.apache.org/jira/browse/IMPALA-9999" scope="external" format="html" keys="IMPALA-9999"/>
 
 <!-- Short form of mapping from Impala release to vendor-specific releases, for use in headings. -->
+  <keydef keys="impala210"><topicmeta><keywords><keyword>Impala 2.10</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala29"><topicmeta><keywords><keyword>Impala 2.9</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala28"><topicmeta><keywords><keyword>Impala 2.8</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala27"><topicmeta><keywords><keyword>Impala 2.7</keyword></keywords></topicmeta></keydef>
@@ -10530,6 +10531,7 @@ under the License.
   <keydef keys="impala13"><topicmeta><keywords><keyword>Impala 1.3</keyword></keywords></topicmeta></keydef>
 
 <!-- 3-part forms of version numbers, for use in release notes. -->
+  <keydef keys="impala2100"><topicmeta><keywords><keyword>Impala 2.10.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala290"><topicmeta><keywords><keyword>Impala 2.9.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala280"><topicmeta><keywords><keyword>Impala 2.8.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala270"><topicmeta><keywords><keyword>Impala 2.7.0</keyword></keywords></topicmeta></keydef>
@@ -10566,6 +10568,7 @@ under the License.
   <keydef keys="impala130"><topicmeta><keywords><keyword>Impala 1.3.0</keyword></keywords></topicmeta></keydef>
 
 <!-- Long form of mapping from Impala release to vendor-specific releases, for use in running text. -->
+  <keydef keys="impala210_full"><topicmeta><keywords><keyword>Impala 2.10</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala29_full"><topicmeta><keywords><keyword>Impala 2.9</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala28_full"><topicmeta><keywords><keyword>Impala 2.8</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala27_full"><topicmeta><keywords><keyword>Impala 2.7</keyword></keywords></topicmeta></keydef>
@@ -10581,6 +10584,7 @@ under the License.
   <keydef keys="impala13_full"><topicmeta><keywords><keyword>Impala 1.3</keyword></keywords></topicmeta></keydef>
 
 <!-- Pointers to changelog pages -->
+  <keydef keys="changelog_210" href="https://impala.incubator.apache.org/docs/changelog-2.10.html" scope="external" format="html"/>
   <keydef keys="changelog_29" href="https://impala.incubator.apache.org/docs/changelog-2.9.html" scope="external" format="html"/>
   <keydef keys="changelog_28" href="https://impala.incubator.apache.org/docs/changelog-2.8.html" scope="external" format="html"/>
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d565386b/docs/topics/impala_fixed_issues.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_fixed_issues.xml b/docs/topics/impala_fixed_issues.xml
index 06c1c65..4acaf29 100644
--- a/docs/topics/impala_fixed_issues.xml
+++ b/docs/topics/impala_fixed_issues.xml
@@ -46,6 +46,22 @@ under the License.
     <p outputclass="toc inpage"/>
   </conbody>
 
+<!-- All 2.10.x subsections go under here -->
+
+  <concept rev="2.10.0" id="fixed_issues_2100">
+
+    <title>Issues Fixed in <keyword keyref="impala2100"/></title>
+
+    <conbody>
+
+      <p>
+        For the full list of issues closed in this release, including bug fixes,
+        see the <xref keyref="changelog_210">changelog for <keyword keyref="impala210"/></xref>.
+      </p>
+
+    </conbody>
+  </concept>
+
 <!-- All 2.9.x subsections go under here -->
 
   <concept rev="2.9.0" id="fixed_issues_290">

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d565386b/docs/topics/impala_incompatible_changes.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_incompatible_changes.xml b/docs/topics/impala_incompatible_changes.xml
index a21e026..c2eb6fb 100644
--- a/docs/topics/impala_incompatible_changes.xml
+++ b/docs/topics/impala_incompatible_changes.xml
@@ -53,6 +53,22 @@ under the License.
     <p outputclass="toc inpage"/>
   </conbody>
 
+  <concept rev="2.10.0" id="incompatible_changes_210x">
+
+    <title>Incompatible Changes Introduced in Impala 2.10.x</title>
+
+    <conbody>
+
+      <p>
+        For the full list of issues closed in this release, including any that introduce
+        behavior changes or incompatibilities, see the
+        <xref keyref="changelog_210">changelog for <keyword keyref="impala210"/></xref>.
+      </p>
+
+    </conbody>
+
+  </concept>
+
   <concept rev="2.9.0" id="incompatible_changes_29x">
 
     <title>Incompatible Changes Introduced in Impala 2.9.x</title>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d565386b/docs/topics/impala_new_features.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_new_features.xml b/docs/topics/impala_new_features.xml
index c8d90cc..ae63ad3 100644
--- a/docs/topics/impala_new_features.xml
+++ b/docs/topics/impala_new_features.xml
@@ -46,6 +46,23 @@ under the License.
 
   </conbody>
 
+<!-- All 2.10.x new features go under here -->
+
+  <concept rev="2.10.0" id="new_features_2100">
+
+    <title>New Features in <keyword keyref="impala210_full"/></title>
+
+    <conbody>
+
+      <p>
+        For the full list of issues closed in this release, including the issues
+        marked as <q>new features</q> or <q>improvements</q>, see the
+        <xref keyref="changelog_210">changelog for <keyword keyref="impala210"/></xref>.
+      </p>
+
+    </conbody>
+  </concept>
+
 <!-- All 2.9.x new features go under here -->
 
   <concept rev="2.9.0" id="new_features_290">
@@ -56,7 +73,7 @@ under the License.
 
       <p>
         For the full list of issues closed in this release, including the issues
-        marked as <q>new features</q>, see the
+        marked as <q>new features</q> or <q>improvements</q>, see the
         <xref keyref="changelog_29">changelog for <keyword keyref="impala29"/></xref>.
       </p>
 


[3/3] incubator-impala git commit: IMPALA-5538: Use explicit catalog versions for deleted objects

Posted by jr...@apache.org.
IMPALA-5538: Use explicit catalog versions for deleted objects

This commit changes the way deletions are handled in the catalog and
disseminated to the impalad nodes through the statestore. Previously,
deletions of catalog objects were not explicitly annotated with the
catalog version in which the deletion occured and the impalads were
using the max catalog version in a catalog update in order to decide
whether a deletion should be applied to the local catalog cache or not.
This works correctly under the assumption that
all the changes that occurred in the catalog between an update's min
and max catalog version are included in that update, i.e. no gaps or
missing updates. With the upcoming fix for IMPALA-5058, that constraint
will be relaxed, thus allowing for gaps in the catalog updates.

To avoid breaking the existing behavior, this patch introduced the
following changes:
* Deletions in the catalog are explicitly recorded in a log with
the catalog version in which they occurred. As before, added and deleted
catalog objects are sent to the statestore.
* Topic entries associated with deleted catalog objects have non-empty
values (besided keys) that contain minimal object metadata including the
catalog version.
* Statestore is no longer using the existence or not of
topic entry values in order to identify deleted topic entries. Deleted
topic entries should be explicitly marked as such by the statestore
subscribers that produce them.
* Statestore subscribers now use the 'deleted' flag to determine if a
topic entry corresponds to a deleted item.
* Impalads use the deleted objects' catalog versions when updating the
local catalog cache from a catalog update and not the update's maximum
catalog version.

Testing:
- No new tests were added as these paths are already exercised by
existing tests.
- Run all core tests.

Change-Id: I93cb7a033dc8f0d3e0339394b36affe14523274c
Reviewed-on: http://gerrit.cloudera.org:8080/7731
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dd340b88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dd340b88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dd340b88

Branch: refs/heads/master
Commit: dd340b8810ecd00ad2ffe79845ca137e941aefb7
Parents: d565386
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Mon Aug 14 11:05:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Sep 26 20:20:56 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  57 ++--
 be/src/catalog/catalog-server.h                 |  39 +--
 be/src/catalog/catalog-util.cc                  |  15 -
 be/src/catalog/catalog-util.h                   |   9 -
 be/src/catalog/catalog.cc                       |  19 +-
 be/src/catalog/catalog.h                        |  14 +-
 be/src/scheduling/admission-controller.cc       |  18 +-
 be/src/scheduling/admission-controller.h        |   7 +-
 be/src/scheduling/scheduler-test-util.cc        |   5 +-
 be/src/scheduling/scheduler.cc                  |  22 +-
 be/src/service/impala-server.cc                 | 121 ++++----
 be/src/statestore/statestore.cc                 |  56 ++--
 be/src/statestore/statestore.h                  |  43 +--
 common/thrift/CatalogInternalService.thrift     |  24 +-
 common/thrift/StatestoreService.thrift          |  31 ++-
 .../apache/impala/catalog/CatalogDeltaLog.java  | 104 ++++---
 .../impala/catalog/CatalogServiceCatalog.java   | 275 ++++++++++++-------
 .../apache/impala/catalog/ImpaladCatalog.java   |  22 +-
 .../impala/service/CatalogOpExecutor.java       |  15 +
 .../org/apache/impala/service/JniCatalog.java   |  11 +-
 tests/statestore/test_statestore.py             |   8 +-
 21 files changed, 477 insertions(+), 438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 38a64d1..b4745fe 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -228,13 +228,10 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   const TTopicDelta& delta = topic->second;
 
-  // If this is not a delta update, clear all catalog objects and request an update
-  // from version 0 from the local catalog. There is an optimization that checks if
-  // pending_topic_updates_ was just reloaded from version 0, if they have then skip this
-  // step and use that data.
-  if (delta.from_version == 0 && delta.to_version == 0 &&
-      catalog_objects_min_version_ != 0) {
-    catalog_topic_entry_keys_.clear();
+  // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
+  // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
+  // to reload the full catalog.
+  if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
     last_sent_catalog_version_ = 0L;
   } else {
     // Process the pending topic update.
@@ -284,14 +281,17 @@ void CatalogServer::UpdateCatalogTopicCallback(
     } else if (current_catalog_version != last_sent_catalog_version_) {
       // If there has been a change since the last time the catalog was queried,
       // call into the Catalog to find out what has changed.
-      TGetAllCatalogObjectsResponse catalog_objects;
-      status = catalog_->GetAllCatalogObjects(last_sent_catalog_version_,
-          &catalog_objects);
+      TGetCatalogDeltaResponse catalog_objects;
+      status = catalog_->GetCatalogDelta(last_sent_catalog_version_, &catalog_objects);
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        // Use the catalog objects to build a topic update list.
-        BuildTopicUpdates(catalog_objects.objects);
+        // Use the catalog objects to build a topic update list. These include
+        // objects added to the catalog, 'updated_objects', and objects deleted
+        // from the catalog, 'deleted_objects'. The order in which we process
+        // these two disjoint sets of catalog objects does not matter.
+        BuildTopicUpdates(catalog_objects.updated_objects, false);
+        BuildTopicUpdates(catalog_objects.deleted_objects, true);
         catalog_objects_min_version_ = last_sent_catalog_version_;
         catalog_objects_max_version_ = catalog_objects.max_catalog_version;
       }
@@ -302,31 +302,19 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
-void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) {
-  unordered_set<string> current_entry_keys;
-  // Add any new/updated catalog objects to the topic.
+void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects,
+    bool topic_deletions) {
   for (const TCatalogObject& catalog_object: catalog_objects) {
+    DCHECK_GT(catalog_object.catalog_version, last_sent_catalog_version_);
     const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
     if (entry_key.empty()) {
       LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
                                << ThriftDebugString(catalog_object);
     }
-
-    current_entry_keys.insert(entry_key);
-    // Remove this entry from catalog_topic_entry_keys_. At the end of this loop, we will
-    // be left with the set of keys that were in the last update, but not in this
-    // update, indicating which objects have been removed/dropped.
-    catalog_topic_entry_keys_.erase(entry_key);
-
-    // This isn't a new or an updated item, skip it.
-    if (catalog_object.catalog_version <= last_sent_catalog_version_) continue;
-
-    VLOG(1) << "Publishing update: " << entry_key << "@"
-            << catalog_object.catalog_version;
-
     pending_topic_updates_.push_back(TTopicItem());
     TTopicItem& item = pending_topic_updates_.back();
     item.key = entry_key;
+    item.deleted = topic_deletions;
     Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
     if (!status.ok()) {
       LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
@@ -340,18 +328,9 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
         pending_topic_updates_.pop_back();
       }
     }
+    VLOG(1) << "Publishing " << (topic_deletions ? "deletion " : "update ")
+        << ": " << entry_key << "@" << catalog_object.catalog_version;
   }
-
-  // Any remaining items in catalog_topic_entry_keys_ indicate the object was removed
-  // since the last update.
-  for (const string& key: catalog_topic_entry_keys_) {
-    pending_topic_updates_.push_back(TTopicItem());
-    TTopicItem& item = pending_topic_updates_.back();
-    item.key = key;
-    VLOG(1) << "Publishing deletion: " << key;
-    // Don't set a value to mark this item as deleted.
-  }
-  catalog_topic_entry_keys_.swap(current_entry_keys);
 }
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 9d33591..452a9b9 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -87,12 +87,6 @@ class CatalogServer {
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
-  /// Tracks the set of catalog objects that exist via their topic entry key.
-  /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects
-  /// that exist by their topic entry key. Used to track objects that have been removed
-  /// since the last heartbeat.
-  boost::unordered_set<std::string> catalog_topic_entry_keys_;
-
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -134,14 +128,10 @@ class CatalogServer {
   /// finds all catalog objects that have a catalog version greater than the last update
   /// sent by calling into the JniCatalog. The topic is updated with any catalog objects
   /// that are new or have been modified since the last heartbeat (by comparing the
-  /// catalog version of the object with last_sent_catalog_version_). Also determines any
-  /// deletions of catalog objects by looking at the
-  /// difference of the last set of topic entry keys that were sent and the current set
-  /// of topic entry keys. At the end of execution it notifies the
-  /// catalog_update_gathering_thread_ to fetch the next set of updates from the
-  /// JniCatalog.
-  /// All updates are added to the subscriber_topic_updates list and sent back to the
-  /// Statestore.
+  /// catalog version of the object with last_sent_catalog_version_). At the end of
+  /// execution it notifies the catalog_update_gathering_thread_ to fetch the next set of
+  /// updates from the JniCatalog. All updates are added to the subscriber_topic_updates
+  /// list and sent back to the Statestore.
   void UpdateCatalogTopicCallback(
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -152,20 +142,19 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
-  /// This function determines what items have been added/removed from the catalog
-  /// since the last heartbeat and builds the next topic update to send. To do this, it
-  /// enumerates the given catalog objects returned looking for the objects that have a
-  /// catalog version that is > the catalog version sent with the last heartbeat. To
-  /// determine items that have been deleted, it saves the set of topic entry keys sent
-  /// with the last update and looks at the difference between it and the current set of
-  /// topic entry keys.
+  /// Builds the next topic update to send based on what items
+  /// have been added/changed/removed from the catalog since the last hearbeat. To do
+  /// this, it enumerates the given catalog objects returned looking for the objects that
+  /// have a catalog version that is > the catalog version sent with the last heartbeat.
+  /// 'topic_deletions' is true if 'catalog_objects' contain deleted catalog
+  /// objects.
+  ///
   /// The key for each entry is a string composed of:
   /// "TCatalogObjectType:<unique object name>". So for table foo.bar, the key would be
   /// "TABLE:foo.bar". Encoding the object type information in the key ensures the keys
-  /// are unique, as well as helps to determine what object type was removed in a state
-  /// store delta update (since the state store only sends key names for deleted items).
-  /// Must hold catalog_lock_ when calling this function.
-  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects);
+  /// are unique. Must hold catalog_lock_ when calling this function.
+  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects,
+      bool topic_deletions);
 
   /// Example output:
   /// "databases": [

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index de4f2fd..7b115b0 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -55,21 +55,6 @@ TCatalogObjectType::type TCatalogObjectTypeFromName(const string& name) {
   return TCatalogObjectType::UNKNOWN;
 }
 
-Status TCatalogObjectFromEntryKey(const string& key,
-    TCatalogObject* catalog_object) {
-  // Reconstruct the object based only on the key.
-  size_t pos = key.find(":");
-  if (pos == string::npos || pos >= key.size() - 1) {
-    stringstream error_msg;
-    error_msg << "Invalid topic entry key format: " << key;
-    return Status(error_msg.str());
-  }
-
-  TCatalogObjectType::type object_type = TCatalogObjectTypeFromName(key.substr(0, pos));
-  string object_name = key.substr(pos + 1);
-  return TCatalogObjectFromObjectName(object_type, object_name, catalog_object);
-}
-
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
     const string& object_name, TCatalogObject* catalog_object) {
   switch (object_type) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index ddc2c21..e98cd38 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -32,15 +32,6 @@ class Status;
 /// TCatalogObjectType::UNKNOWN if no match was found.
 TCatalogObjectType::type TCatalogObjectTypeFromName(const std::string& name);
 
-/// Parses the given IMPALA_CATALOG_TOPIC topic entry key to determine the
-/// TCatalogObjectType and unique object name. Populates catalog_object with the result.
-/// This is used to reconstruct type information when an item is deleted from the
-/// topic. At that time the only context available about the object being deleted is its
-/// its topic entry key which contains only the type and object name. The resulting
-/// TCatalogObject can then be used to removing a matching item from the catalog.
-Status TCatalogObjectFromEntryKey(const std::string& key,
-    TCatalogObject* catalog_object);
-
 /// Populates a TCatalogObject based on the given object type (TABLE, DATABASE, etc) and
 /// object name string.
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index be7b93f..fd5f940 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -55,7 +55,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
-    {"getCatalogObjects", "(J)[B", &get_catalog_objects_id_},
+    {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -90,19 +90,10 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
-Status Catalog::GetAllCatalogObjects(long from_version,
-    TGetAllCatalogObjectsResponse* resp) {
-  JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jvalue requested_from_version;
-  requested_from_version.j = from_version;
-  jbyteArray result_bytes = static_cast<jbyteArray>(
-      jni_env->CallObjectMethod(catalog_, get_catalog_objects_id_,
-      requested_from_version));
-  RETURN_ERROR_IF_EXC(jni_env);
-  RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, resp));
-  return Status::OK();
+Status Catalog::GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp) {
+  TGetCatalogDeltaRequest request;
+  request.__set_from_version(from_version);
+  return JniUtil::CallJniMethod(catalog_, get_catalog_delta_id_, request, resp);
 }
 
 Status Catalog::ExecDdl(const TDdlExecRequest& req, TDdlExecResponse* resp) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index ab6a2a3..3119d60 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -56,12 +56,10 @@ class Catalog {
   /// Status object with information on the error will be returned.
   Status GetCatalogVersion(long* version);
 
-  /// Gets all Catalog objects and the metadata that is applicable for the given request.
-  /// Always returns all object names that exist in the Catalog, but allows for extended
-  /// metadata for objects that were modified after the specified version.
-  /// Returns OK if the operation was successful, otherwise a Status object with
-  /// information on the error will be returned.
-  Status GetAllCatalogObjects(long from_version, TGetAllCatalogObjectsResponse* resp);
+  /// Retrieves the catalog objects that were added/modified/deleted since version
+  /// 'from_version'. Returns OK if the operation was successful, otherwise a Status
+  /// object with information on the error will be returned.
+  Status GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp);
 
   /// Gets the Thrift representation of a Catalog object. The request is a TCatalogObject
   /// which has the desired TCatalogObjectType and name properly set.
@@ -74,7 +72,7 @@ class Catalog {
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
   /// and each pN may contain wildcards denoted by '*' which match all strings.
   /// TODO: GetDbs() and GetTableNames() can probably be scrapped in favor of
-  /// GetAllCatalogObjects(). Consider removing them and moving everything to use
+  /// GetCatalogDelta(). Consider removing them and moving everything to use
   /// that.
   Status GetDbs(const std::string* pattern, TGetDbsResult* dbs);
 
@@ -109,7 +107,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
-  jmethodID get_catalog_objects_id_;  // JniCatalog.getCatalogObjects()
+  jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index c23f4be..c29020c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -646,7 +646,6 @@ void AdmissionController::UpdatePoolStats(
         }
       }
       HandleTopicUpdates(delta.topic_entries);
-      HandleTopicDeletions(delta.topic_deletions);
     }
     UpdateClusterAggregates();
   }
@@ -684,6 +683,10 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
     // The topic entry from this subscriber is handled specially; the stats coming
     // from the statestore are likely already outdated.
     if (topic_backend_id == host_id_) continue;
+    if (item.deleted) {
+      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+      continue;
+    }
     TPoolStats remote_update;
     uint32_t len = item.value.size();
     Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -696,18 +699,7 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
   }
 }
 
-void AdmissionController::HandleTopicDeletions(const vector<string>& topic_deletions) {
-  for (const string& topic_key: topic_deletions) {
-    string pool_name;
-    string topic_backend_id;
-    if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
-    if (topic_backend_id == host_id_) continue;
-    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-  }
-}
-
-void AdmissionController::PoolStats::UpdateAggregates(
-    HostMemMap* host_mem_reserved) {
+void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) {
   const string& coord_id = parent_->host_id_;
   int64_t num_running = 0;
   int64_t num_queued = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 81b2968..3f18f6d 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -426,13 +426,10 @@ class AdmissionController {
   void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
-  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  /// Removes remote stats identified by topic deletions coming from the
+  /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
   void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
 
-  /// Removes remote stats identified by the topic_deletions coming from the statestore.
-  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
-  void HandleTopicDeletions(const std::vector<std::string>& topic_deletions);
-
   /// Re-computes the per-pool aggregate stats and the per-host aggregates in
   /// host_mem_reserved_ using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 841bc24..2d85a9a 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -479,7 +479,10 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
   TTopicDelta delta;
   delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
-  delta.topic_deletions.push_back(host.ip);
+  TTopicItem item;
+  item.__set_deleted(true);
+  item.__set_key(host.ip);
+  delta.topic_entries.push_back(item);
   SendTopicDelta(delta);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index f6e323f..5c2f907 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -140,9 +140,7 @@ void Scheduler::UpdateMembership(
 
   // If the delta transmitted by the statestore is empty we can skip processing
   // altogether and avoid making a copy of executors_config_.
-  if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) {
-    return;
-  }
+  if (delta.is_delta && delta.topic_entries.empty()) return;
 
   // This function needs to handle both delta and non-delta updates. To minimize the
   // time needed to hold locks, all updates are applied to a copy of
@@ -159,10 +157,17 @@ void Scheduler::UpdateMembership(
     new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new entries to the topic. Update executors_config_ and
+  // Process new and removed entries to the topic. Update executors_config_ and
   // current_executors_ to match the set of executors given by the
   // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
+    if (item.deleted) {
+      if (current_executors_.find(item.key) != current_executors_.end()) {
+        new_executors_config->RemoveBackend(current_executors_[item.key]);
+        current_executors_.erase(item.key);
+      }
+      continue;
+    }
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
     // ~10m messages per second, so no immediate need to consider optimization.
@@ -195,15 +200,6 @@ void Scheduler::UpdateMembership(
       current_executors_.insert(make_pair(item.key, be_desc));
     }
   }
-
-  // Process deletions from the topic
-  for (const string& backend_id : delta.topic_deletions) {
-    if (current_executors_.find(backend_id) != current_executors_.end()) {
-      new_executors_config->RemoveBackend(current_executors_[backend_id]);
-      current_executors_.erase(backend_id);
-    }
-  }
-
   SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 11de619..bde2288 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1304,10 +1304,10 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
-
   // Update catalog cache in frontend. An update is split into batches of size
   // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
-  if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
+  if (delta.topic_entries.size() != 0)  {
+    vector<TCatalogObject> dropped_objects;
     vector<TUpdateCatalogCacheRequest> update_reqs;
     update_reqs.push_back(TUpdateCatalogCacheRequest());
     TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
@@ -1317,7 +1317,6 @@ void ImpalaServer::CatalogUpdateCallback(
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
     uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
-      TCatalogObject catalog_object;
       Status status;
       vector<uint8_t> data_buffer;
       const uint8_t* data_buffer_ptr = nullptr;
@@ -1335,82 +1334,70 @@ void ImpalaServer::CatalogUpdateCallback(
         data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
         len = item.value.size();
       }
+      if (len > 100 * 1024 * 1024 /* 100MB */) {
+        LOG(INFO) << "Received large catalog object(>100mb): "
+            << item.key << " is "
+            << PrettyPrinter::Print(len, TUnit::BYTES);
+      }
+      TCatalogObject catalog_object;
       status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic,
           &catalog_object);
       if (!status.ok()) {
         LOG(ERROR) << "Error deserializing item " << item.key
-                   << ": " << status.GetDetail();
+            << ": " << status.GetDetail();
         continue;
       }
-      if (len > 100 * 1024 * 1024 /* 100MB */) {
-        LOG(INFO) << "Received large catalog update(>100mb): "
-                     << item.key << " is "
-                     << PrettyPrinter::Print(len, TUnit::BYTES);
-      }
-      if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        incremental_request->__set_catalog_service_id(
-            catalog_object.catalog.catalog_service_id);
-        new_catalog_version = catalog_object.catalog_version;
-      }
-
-      // Refresh the lib cache entries of any added functions and data sources
-      // TODO: if frontend returns the list of functions and data sources, we do not
-      // need to deserialize these in backend.
-      if (catalog_object.type == TCatalogObjectType::FUNCTION) {
-        DCHECK(catalog_object.__isset.fn);
-        LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
-      }
-      if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-        DCHECK(catalog_object.__isset.data_source);
-        LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
-      }
 
       if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
+        // Initialize a new batch of catalog updates.
         update_reqs.push_back(TUpdateCatalogCacheRequest());
         incremental_request = &update_reqs.back();
         batch_size_bytes = 0;
       }
-      incremental_request->updated_objects.push_back(catalog_object);
-      batch_size_bytes += len;
-    }
-    update_reqs.push_back(TUpdateCatalogCacheRequest());
-    TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back();
-
-    // We need to look up the dropped functions and data sources and remove them
-    // from the library cache. The data sent from the catalog service does not
-    // contain all the function metadata so we'll ask our local frontend for it. We
-    // need to do this before updating the catalog.
-    vector<TCatalogObject> dropped_objects;
 
-    // Process all Catalog deletions (dropped objects). We only know the keys (object
-    // names) so must parse each key to determine the TCatalogObject.
-    for (const string& key: delta.topic_deletions) {
-      LOG(INFO) << "Catalog topic entry deletion: " << key;
-      TCatalogObject catalog_object;
-      Status status = TCatalogObjectFromEntryKey(key, &catalog_object);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error parsing catalog topic entry deletion key: " << key << " "
-                   << "Error: " << status.GetDetail();
-        continue;
+      if (catalog_object.type == TCatalogObjectType::CATALOG) {
+        incremental_request->__set_catalog_service_id(
+            catalog_object.catalog.catalog_service_id);
+        new_catalog_version = catalog_object.catalog_version;
       }
-      deletion_request->removed_objects.push_back(catalog_object);
-      if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-          catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-        TCatalogObject dropped_object;
-        if (exec_env_->frontend()->GetCatalogObject(
-                catalog_object, &dropped_object).ok()) {
-          // This object may have been dropped and re-created. To avoid removing the
-          // re-created object's entry from the cache verify the existing object has a
-          // catalog version <= the catalog version included in this statestore heartbeat.
-          if (dropped_object.catalog_version <= new_catalog_version) {
-            if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-                catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-              dropped_objects.push_back(dropped_object);
+      VLOG(3) << (item.deleted ? "Deleted " : "Added ") << "item: " << item.key
+          << " version: " << catalog_object.catalog_version << " of size: " << len;
+
+      if (!item.deleted) {
+        // Refresh the lib cache entries of any added functions and data sources
+        // TODO: if frontend returns the list of functions and data sources, we do not
+        // need to deserialize these in backend.
+        if (catalog_object.type == TCatalogObjectType::FUNCTION) {
+          DCHECK(catalog_object.__isset.fn);
+          LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
+        }
+        if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+          DCHECK(catalog_object.__isset.data_source);
+          LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
+        }
+        incremental_request->updated_objects.push_back(catalog_object);
+      } else {
+        // We need to look up any dropped functions and data sources and remove
+        // them from the library cache.
+        if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+            catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+          TCatalogObject existing_object;
+          if (exec_env_->frontend()->GetCatalogObject(
+              catalog_object, &existing_object).ok()) {
+            // If the object exists in the catalog it may have been dropped and
+            // re-created. To avoid removing the re-created object's entry from
+            // the cache verify that the existing object's version <= the
+            // version of the dropped object included in this statestore
+            // heartbeat.
+            DCHECK_NE(existing_object.catalog_version, catalog_object.catalog_version);
+            if (existing_object.catalog_version < catalog_object.catalog_version) {
+              dropped_objects.push_back(existing_object);
             }
           }
         }
-        // Nothing to do in error case.
+        incremental_request->removed_objects.push_back(catalog_object);
       }
+      batch_size_bytes += len;
     }
 
     // Call the FE to apply the changes to the Impalad Catalog.
@@ -1531,6 +1518,10 @@ void ImpalaServer::MembershipCallback(
 
     // Process membership additions.
     for (const TTopicItem& item: delta.topic_entries) {
+      if (item.deleted) {
+        known_backends_.erase(item.key);
+        continue;
+      }
       uint32_t len = item.value.size();
       TBackendDescriptor backend_descriptor;
       Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -1546,18 +1537,12 @@ void ImpalaServer::MembershipCallback(
     // Register the local backend in the statestore and update the list of known backends.
     AddLocalBackendToStatestore(subscriber_topic_updates);
 
-    // Process membership deletions.
-    for (const string& backend_id: delta.topic_deletions) {
-      known_backends_.erase(backend_id);
-    }
-
     // Create a set of known backend network addresses. Used to test for cluster
     // membership by network address.
     set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes is true.
     TUpdateMembershipRequest update_req;
-    bool any_changes = !delta.topic_entries.empty() || !delta.topic_deletions.empty() ||
-        !delta.is_delta;
+    bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
     for (const BackendDescriptorMap::value_type& backend: known_backends_) {
       current_membership.insert(backend.second.address);
       if (any_changes) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 75ba5c7..6d5880f 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -85,8 +85,6 @@ const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-by
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
-const Statestore::TopicEntry::Value Statestore::TopicEntry::NULL_VALUE = "";
-
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
@@ -124,13 +122,13 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
 void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
     TopicEntry::Version version) {
-  DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
+  DCHECK_GT(bytes.size(), 0);
   value_ = bytes;
   version_ = version;
 }
 
 Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes) {
+    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
   TopicEntryMap::iterator entry_it = entries_.find(key);
   int64_t key_size_delta = 0;
   int64_t value_size_delta = 0;
@@ -147,6 +145,7 @@ Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
   value_size_delta += bytes.size();
 
   entry_it->second.SetValue(bytes, ++last_version_);
+  entry_it->second.SetDeleted(is_deleted);
   topic_update_log_.insert(make_pair(entry_it->second.version(), key));
 
   total_key_size_bytes_ += key_size_delta;
@@ -168,12 +167,10 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     // entry
     topic_update_log_.erase(version);
     topic_update_log_.insert(make_pair(++last_version_, key));
-    total_value_size_bytes_ -= entry_it->second.value().size();
-    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
-
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
-    entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
+    entry_it->second.SetDeleted(true);
+    entry_it->second.SetVersion(last_version_);
   }
 }
 
@@ -454,11 +451,9 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // At this point the updates are assumed to have been successfully processed by the
   // subscriber. Update the subscriber's max version of each topic.
-  map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
-      update_state_request.topic_deltas.begin();
-  for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
-    subscriber->SetLastTopicVersionProcessed(topic_delta->first,
-        topic_delta->second.to_version);
+  for (const auto& topic_delta: update_state_request.topic_deltas) {
+    subscriber->SetLastTopicVersionProcessed(topic_delta.first,
+        topic_delta.second.to_version);
   }
 
   // Thirdly: perform any / all updates returned by the subscriber
@@ -487,14 +482,8 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
       Topic* topic = &topic_it->second;
       for (const TTopicItem& item: update.topic_entries) {
-        TopicEntry::Version version = topic->Put(item.key, item.value);
-        subscriber->AddTransientUpdate(update.topic_name, item.key, version);
-      }
-
-      for (const string& key: update.topic_deletions) {
-        TopicEntry::Version version =
-            topic->Put(key, Statestore::TopicEntry::NULL_VALUE);
-        subscriber->AddTransientUpdate(update.topic_name, key, version);
+        subscriber->AddTransientUpdate(update.topic_name, item.key,
+            topic->Put(item.key, item.value, item.deleted));
       }
     }
   }
@@ -528,30 +517,25 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
       TopicUpdateLog::const_iterator next_update =
           topic.topic_update_log().upper_bound(last_processed_version);
 
-      int64_t deleted_key_size_bytes = 0;
+      uint64_t topic_size = 0;
       for (; next_update != topic.topic_update_log().end(); ++next_update) {
         TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
         DCHECK(itr != topic.entries().end());
         const TopicEntry& topic_entry = itr->second;
-        if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
-          if (!topic_delta.is_delta) {
-            deleted_key_size_bytes += itr->first.size();
-            continue;
-          }
-          topic_delta.topic_deletions.push_back(itr->first);
-        } else {
-          topic_delta.topic_entries.push_back(TTopicItem());
-          TTopicItem& topic_item = topic_delta.topic_entries.back();
-          topic_item.key = itr->first;
-          // TODO: Does this do a needless copy?
-          topic_item.value = topic_entry.value();
+        // Don't send deleted entries for non-delta updates.
+        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
+          continue;
         }
+        topic_delta.topic_entries.push_back(TTopicItem());
+        TTopicItem& topic_item = topic_delta.topic_entries.back();
+        topic_item.key = itr->first;
+        topic_item.value = topic_entry.value();
+        topic_item.deleted = topic_entry.is_deleted();
+        topic_size += topic_item.key.size() + topic_item.value.size();
       }
 
       if (!topic_delta.is_delta &&
           topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes
-            + topic.total_value_size_bytes();
         VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
                    << " topic update for " << subscriber.id() << ". Size = "
                    << PrettyPrinter::Print(topic_size, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 05feac3..26aa836 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -131,8 +131,7 @@ class Statestore : public CacheLineAligned {
 
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
-  /// pair. If the byte string is NULL, the entry has been deleted, but may be retained to
-  /// track changes to send to subscribers.
+  /// pair.
   class TopicEntry {
    public:
     /// A Value is a string of bytes, for which std::string is a convenient representation.
@@ -146,30 +145,38 @@ class Statestore : public CacheLineAligned {
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
 
-    /// Representation of an empty Value. Must have size() == 0.
-    static const Value NULL_VALUE;
-
-    /// Sets the value of this entry to the byte / length pair. NULL_VALUE implies this
-    /// entry has been deleted.  The caller is responsible for ensuring, if required, that
-    /// the version parameter is larger than the current version() TODO: Consider enforcing
-    /// version monotonicity here.
+    /// Sets the value of this entry to the byte / length pair. The caller is responsible
+    /// for ensuring, if required, that the version parameter is larger than the
+    /// current version() TODO: Consider enforcing version monotonicity here.
     void SetValue(const Value& bytes, Version version);
 
-    TopicEntry() : value_(NULL_VALUE), version_(TOPIC_ENTRY_INITIAL_VERSION) { }
+    /// Sets a new version for this entry.
+    void SetVersion(Version version) { version_ = version; }
+
+    /// Sets the is_deleted_ flag for this entry.
+    void SetDeleted(bool is_deleted) { is_deleted_ = is_deleted; }
+
+    TopicEntry() : version_(TOPIC_ENTRY_INITIAL_VERSION),
+        is_deleted_(false) { }
 
     const Value& value() const { return value_; }
     uint64_t version() const { return version_; }
     uint32_t length() const { return value_.size(); }
+    bool is_deleted() const { return is_deleted_; }
 
    private:
-    /// Byte string value, owned by this TopicEntry. The value is opaque to the statestore,
-    /// and is interpreted only by subscribers.
+    /// Byte string value, owned by this TopicEntry. The value is opaque to the
+    /// statestore, and is interpreted only by subscribers.
     Value value_;
 
     /// The version of this entry. Every update is assigned a monotonically increasing
     /// version number so that only the minimal set of changes can be sent from the
     /// statestore to a subscriber.
     Version version_;
+
+    /// Indicates if the entry has been deleted. If true, the entry will still be
+    /// retained to track changes to send to subscribers.
+    bool is_deleted_;
   };
 
   /// Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
@@ -192,19 +199,21 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key. If bytes == NULL_VALUE, the entry is considered
-    /// deleted, and may be garbage collected in the future. The entry is assigned a new
-    /// version number by the Topic, and that version number is returned.
+    /// Adds an entry with the given key and value (bytes). If is_deleted is
+    /// true the entry is considered deleted, and may be garbage collected in the future.
+    /// The entry is assigned a new version number by the Topic, and that version number
+    /// is returned.
     //
     /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes);
+    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
+        bool is_deleted);
 
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
     /// when that subscriber fails (the same entry may exist, but may have been updated by
     /// another subscriber giving it a new version number)
     //
-    /// Deletion means setting the entry's value to NULL and incrementing its version
+    /// Deletion means marking the entry as deleted and incrementing its version
     /// number.
     //
     /// Must be called holding the topic lock

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/common/thrift/CatalogInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogInternalService.thrift b/common/thrift/CatalogInternalService.thrift
index 5b68408..5170298 100644
--- a/common/thrift/CatalogInternalService.thrift
+++ b/common/thrift/CatalogInternalService.thrift
@@ -22,15 +22,25 @@ include "CatalogObjects.thrift"
 
 // Contains structures used internally by the Catalog Server.
 
-// Response from a call to GetAllCatalogObjects. Contains all known Catalog objects
-// (databases, tables/views, and functions) from the CatalogService's cache.
-// What metadata is included for each object is based on the parameters used in
-// the request.
-struct TGetAllCatalogObjectsResponse {
+// Arguments to a GetCatalogDelta call.
+struct TGetCatalogDeltaRequest {
+  // The base catalog version from which the delta is computed.
+  1: required i64 from_version
+}
+
+// Response from a call to GetCatalogDelta. Contains a delta of catalog objects
+// (databases, tables/views, and functions) from the CatalogService's cache relative (>)
+// to the catalog version specified in TGetCatalogDelta.from_version.
+struct TGetCatalogDeltaResponse {
   // The maximum catalog version of all objects in this response or 0 if the Catalog
   // contained no objects.
   1: required i64 max_catalog_version
 
-  // List of catalog objects (empty list if no objects detected in the Catalog).
-  2: required list<CatalogObjects.TCatalogObject> objects
+  // List of updated (new and modified) catalog objects for which the catalog verion is
+  // larger than TGetCatalotDeltaRequest.from_version.
+  2: required list<CatalogObjects.TCatalogObject> updated_objects
+
+  // List of deleted catalog objects for which the catalog version is larger than
+  // TGetCatalogDelta.from_version.
+  3: required list<CatalogObjects.TCatalogObject> deleted_objects
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 60a0d0d..f04650e 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -79,8 +79,15 @@ struct TTopicItem {
   1: required string key;
 
   // Byte-string value for this topic entry. May not be null-terminated (in that it may
-  // contain null bytes)
+  // contain null bytes). It can be non-empty when deleted is true. This is needed when
+  // subscribers need additional information in order to process the deleted topics that
+  // is not included in the topic key (e.g. catalog version of deleted catalog objects).
   2: required string value;
+
+  // If true, this item was deleted. When false, this TTopicItem need not be included in
+  // non-delta TTopicDelta's (since the latest version of every still-present topic will
+  // be included).
+  3: required bool deleted = false;
 }
 
 // Set of changes to a single topic, sent from the statestore to a subscriber as well as
@@ -89,15 +96,14 @@ struct TTopicDelta {
   // Name of the topic this delta applies to
   1: required string topic_name;
 
-  // List of changes to topic entries
+  // When is_delta=true, a list of changes to topic entries, including deletions, within
+  // [from_version, to_version].
+  // When is_delta=false, this is the list of all non-delete topic entries for
+  // [0, to_version], which can be used to reconstruct the topic from scratch.
   2: required list<TTopicItem> topic_entries;
 
-  // List of topic item keys whose entries have been deleted
-  3: required list<string> topic_deletions;
-
-  // True if entries / deletions are to be applied to in-memory state,
-  // otherwise topic_entries contains entire topic state.
-  4: required bool is_delta;
+  // True if entries / deletions are relative to the topic at versions [0, from_version].
+  3: required bool is_delta;
 
   // The Topic version range this delta covers. If there have been changes to the topic,
   // the update will include all changes in the range: [from_version, to_version).
@@ -105,16 +111,17 @@ struct TTopicDelta {
   // to_version. The from_version will always be 0 for non-delta updates.
   // If this is an update being sent from a subscriber to the statestore, the from_version
   // is set only when recovering from an inconsistent state, to the last version of the
-  // topic the subscriber successfully processed.
-  5: optional i64 from_version
-  6: optional i64 to_version
+  // topic the subscriber successfully processed. The value of to_version doesn't depend
+  // on whether the update is delta or not.
+  4: optional i64 from_version
+  5: optional i64 to_version
 
   // The minimum topic version of all subscribers to the topic. This can be used to
   // determine when all subscribers have successfully processed a specific update.
   // This is guaranteed because no subscriber will ever be sent a topic containing
   // keys with a version < min_subscriber_topic_version. Only used when sending an update
   // from the statestore to a subscriber.
-  7: optional i64 min_subscriber_topic_version
+  6: optional i64 min_subscriber_topic_version
 }
 
 // Description of a topic to subscribe to as part of a RegisterSubscriber call

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index 27839b3..c00c460 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -24,24 +25,37 @@ import java.util.TreeMap;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TTable;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 /**
- * The impalad catalog cache can be modified by either a state store update or by a
- * direct ("fast") update that applies the result of a catalog operation to the cache
- * out-of-band of a state store update. This thread safe log tracks the divergence
- * (due to direct updates to the cache) of this impalad's cache from the last state
- * store update. This log is needed to ensure work is never undone. For example,
- * consider the following sequence of events:
- * t1: [Direct Update] - Add item A - (Catalog Version 9)
- * t2: [Direct Update] - Drop item A - (Catalog Version 10)
- * t3: [StateStore Update] - (From Catalog Version 9)
- * This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ * Represents a log of deleted catalog objects.
  *
- * Currently this only tracks objects that were dropped, since the catalog cache can be
- * queried to check if an object was added. TODO: Also track object additions from async
- * operations. This could be used to to "replay" the log in the case of a catalog reset
- * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
- * "invalidate metadata" is run concurrently with async catalog operations.
+ * There are currently two use cases for this log:
+ *
+ * a) Processing catalog updates in the impalads
+ *   The impalad catalog cache can be modified by either a state store update or by a
+ *   direct ("fast") update that applies the result of a catalog operation to the cache
+ *   out-of-band of a state store update. This thread safe log tracks the divergence
+ *   (due to direct updates to the cache) of this impalad's cache from the last state
+ *   store update. This log is needed to ensure work is never undone. For example,
+ *   consider the following sequence of events:
+ *   t1: [Direct Update] - Add item A - (Catalog Version 9)
+ *   t2: [Direct Update] - Drop item A - (Catalog Version 10)
+ *   t3: [StateStore Update] - (From Catalog Version 9)
+ *   This log is used to ensure the state store update in t3 does not undo the drop in t2.
+ *   Currently this only tracks objects that were dropped, since the catalog cache can be
+ *   queried to check if an object was added. TODO: Also track object additions from async
+ *   operations. This could be used to to "replay" the log in the case of a catalog reset
+ *   ("invalidate metadata"). Currently, the catalog may briefly go back in time if
+ *   "invalidate metadata" is run concurrently with async catalog operations.
+ *
+ * b) Building catalog topic updates in the catalogd
+ *   The catalogd uses this log to identify deleted catalog objects. Deleted
+ *   catalog objects are added to this log by the corresponding operations that delete
+ *   them (e.g. dropTable()). While constructing a catalog update topic, we use the log to
+ *   determine which catalog objects were deleted since the last catalog topic update.
+ *   Once the catalog topic update is constructed, the old deleted catalog objects are
+ *   garbage collected to prevent the log from growing indefinitely.
  */
 public class CatalogDeltaLog {
   // Map of the catalog version an object was removed from the catalog
@@ -58,6 +72,15 @@ public class CatalogDeltaLog {
   }
 
   /**
+   * Retrieve all the removed catalog objects with version > 'fromVersion'.
+   */
+  public synchronized List<TCatalogObject> retrieveObjects(long fromVersion) {
+    SortedMap<Long, TCatalogObject> objects =
+        removedCatalogObjects_.tailMap(fromVersion + 1);
+    return ImmutableList.<TCatalogObject>copyOf(objects.values());
+  }
+
+  /**
    * Given the current catalog version, removes all items with catalogVersion <
    * currectCatalogVersion. Such objects do not need to be tracked in the delta
    * log anymore because they are consistent with the state store's view of the
@@ -91,30 +114,45 @@ public class CatalogDeltaLog {
   }
 
   /**
-   * Returns true if the two objects have the same object type and name.
-   * TODO: Use global object IDs everywhere instead of tracking catalog objects by name.
+   * Returns true if the two objects have the same object type and key (generated using
+   * toCatalogObjectKey()).
+   * TODO: Use global object IDs everywhere instead of tracking catalog objects by
+   * generated keys.
+   */
+  private static boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
+    return toCatalogObjectKey(first).equals(toCatalogObjectKey(second));
+  }
+
+  /**
+   * Returns a unique string key of a catalog object.
    */
-  private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
-    if (first.getType() != second.getType()) return false;
-    switch (first.getType()) {
+  public static String toCatalogObjectKey(TCatalogObject catalogObject)
+      throws IllegalStateException {
+    switch (catalogObject.getType()) {
       case DATABASE:
-        return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name());
+        return "DATABASE:" + catalogObject.getDb().getDb_name().toLowerCase();
       case TABLE:
       case VIEW:
-        TTable firstTbl = first.getTable();
-        return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) &&
-            firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name());
+        TTable tbl = catalogObject.getTable();
+        return "TABLE:" + tbl.getDb_name().toLowerCase() + "." +
+            tbl.getTbl_name().toLowerCase();
       case FUNCTION:
-        return first.getFn().getSignature().equals(second.getFn().getSignature()) &&
-            first.getFn().getName().equals(second.getFn().getName());
+        return "FUNCTION:" + catalogObject.getFn().getName() + "(" +
+            catalogObject.getFn().getSignature() + ")";
       case ROLE:
-        return first.getRole().getRole_name().equalsIgnoreCase(
-            second.getRole().getRole_name());
+        return "ROLE:" + catalogObject.getRole().getRole_name().toLowerCase();
       case PRIVILEGE:
-        return first.getPrivilege().getPrivilege_name().equalsIgnoreCase(
-            second.getPrivilege().getPrivilege_name()) &&
-            first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id();
-      default: return false;
+        return "PRIVILEGE:" +
+            catalogObject.getPrivilege().getPrivilege_name().toLowerCase() + "." +
+            Integer.toString(catalogObject.getPrivilege().getRole_id());
+      case HDFS_CACHE_POOL:
+        return "HDFS_CACHE_POOL:" +
+            catalogObject.getCache_pool().getPool_name().toLowerCase();
+      case DATA_SOURCE:
+        return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
+      default:
+        throw new IllegalStateException(
+            "Unsupported catalog object type: " + catalogObject.getType());
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index d2a0a82..a4f8608 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -59,7 +58,7 @@ import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
@@ -150,6 +149,9 @@ public class CatalogServiceCatalog extends Catalog {
   // Local temporary directory to copy UDF Jars.
   private static String localLibraryPath_;
 
+  // Log of deleted catalog objects.
+  private final CatalogDeltaLog deleteLog_;
+
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -180,6 +182,7 @@ public class CatalogServiceCatalog extends Catalog {
       sentryProxy_ = null;
     }
     localLibraryPath_ = new String("file://" + localLibraryPath);
+    deleteLog_ = new CatalogDeltaLog();
   }
 
   // Timeout for acquiring a table lock
@@ -266,8 +269,15 @@ public class CatalogServiceCatalog extends Catalog {
         }
         // Remove dropped cache pools.
         for (String cachePoolName: droppedCachePoolNames) {
-          hdfsCachePools_.remove(cachePoolName);
-          CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
+          HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
+          if (cachePool != null) {
+            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
+            TCatalogObject removedObject =
+                new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+                    cachePool.getCatalogVersion());
+            removedObject.setCache_pool(cachePool.toThrift());
+            deleteLog_.addRemovedObject(removedObject);
+          }
         }
       } finally {
         catalogLock_.writeLock().unlock();
@@ -297,117 +307,140 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Returns all known objects in the Catalog (Tables, Views, Databases, and
-   * Functions). Some metadata may be skipped for objects that have a catalog
-   * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
-   * update contains a consistent snapshot of all items in the catalog. While holding the
-   * catalog lock, it locks each accessed table to protect against concurrent
-   * modifications.
+   * Computes and returns a delta of catalog objects relative to 'fromVersion'. Takes a
+   * lock on the catalog to ensure this update contains a consistent snapshot of the
+   * catalog.
    */
-  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
-    TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
-    resp.setObjects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
+  public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
     catalogLock_.readLock().lock();
     try {
-      for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
-        TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
-            db.getCatalogVersion());
-        catalogDb.setDb(db.toThrift());
-        resp.addToObjects(catalogDb);
+      TGetCatalogDeltaResponse resp = getCatalogObjects(fromVersion);
+      // Each update should contain a single "TCatalog" object which is used to
+      // pass overall state on the catalog, such as the current version and the
+      // catalog service id.
+      TCatalogObject catalog = new TCatalogObject();
+      catalog.setType(TCatalogObjectType.CATALOG);
+      // By setting the catalog version to the latest catalog version at this point,
+      // it ensure impalads will always bump their versions, even in the case where
+      // an object has been dropped.
+      long currentCatalogVersion = getCatalogVersion();
+      catalog.setCatalog_version(currentCatalogVersion);
+      catalog.setCatalog(new TCatalog(catalogServiceId_));
+      resp.addToUpdated_objects(catalog);
 
-        for (String tblName: db.getAllTableNames()) {
-          TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
-              Catalog.INITIAL_CATALOG_VERSION);
+      // The max version is the max catalog version of all items in the update.
+      resp.setMax_catalog_version(currentCatalogVersion);
+      deleteLog_.garbageCollect(currentCatalogVersion);
+      return resp;
+    } finally {
+      catalogLock_.readLock().unlock();
+    }
+  }
 
-          Table tbl = db.getTable(tblName);
-          if (tbl == null) {
-            LOG.error("Table: " + tblName + " was expected to be in the catalog " +
-                "cache. Skipping table for this update.");
-            continue;
-          }
+  /**
+   * Identify and return the catalog objects that were added/modified/deleted in the
+   * catalog with versions > 'fromVersion'. The caller of this function must hold the
+   * catalog read lock to prevent concurrent modifications of the catalog.
+   */
+  private TGetCatalogDeltaResponse getCatalogObjects(long fromVersion) {
+    TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
+    resp.setUpdated_objects(new ArrayList<TCatalogObject>());
+    resp.setDeleted_objects(new ArrayList<TCatalogObject>());
+    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
 
-          // Protect the table from concurrent modifications.
-          tbl.getLock().lock();
-          try {
-            // Only add the extended metadata if this table's version is >=
-            // the fromVersion.
-            if (tbl.getCatalogVersion() >= fromVersion) {
-              try {
-                catalogTbl.setTable(tbl.toThrift());
-              } catch (Exception e) {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
-                      db.getName(), tblName, e.getMessage()), e);
-                }
-                continue;
+    // process databases
+    for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
+      if (db.getCatalogVersion() > fromVersion) {
+        TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
+            db.getCatalogVersion());
+        catalogDb.setDb(db.toThrift());
+        resp.addToUpdated_objects(catalogDb);
+      }
+      // process tables
+      for (Table tbl: db.getTables()) {
+        TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
+            Catalog.INITIAL_CATALOG_VERSION);
+        // Protect the table from concurrent modifications.
+        tbl.getLock().lock();
+        try {
+          // Only add the extended metadata if this table's version is > fromVersion.
+          if (tbl.getCatalogVersion() > fromVersion) {
+            try {
+              catalogTbl.setTable(tbl.toThrift());
+            } catch (Exception e) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format("Error calling toThrift() on table %s: %s",
+                    tbl.getFullName(), e.getMessage()), e);
               }
-              catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-            } else {
-              catalogTbl.setTable(new TTable(db.getName(), tblName));
+              continue;
             }
-          } finally {
-            tbl.getLock().unlock();
+            catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+            resp.addToUpdated_objects(catalogTbl);
           }
-          resp.addToObjects(catalogTbl);
-        }
-
-        for (Function fn: db.getFunctions(null, new PatternMatcher())) {
-          TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
-              fn.getCatalogVersion());
-          function.setFn(fn.toThrift());
-          resp.addToObjects(function);
+        } finally {
+          tbl.getLock().unlock();
         }
       }
-
-      for (DataSource dataSource: getDataSources()) {
-        TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
-            dataSource.getCatalogVersion());
-        catalogObj.setData_source(dataSource.toThrift());
-        resp.addToObjects(catalogObj);
-      }
-      for (HdfsCachePool cachePool: hdfsCachePools_) {
-        TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-            cachePool.getCatalogVersion());
-        pool.setCache_pool(cachePool.toThrift());
-        resp.addToObjects(pool);
+      // process functions
+      for (Function fn: db.getFunctions(null, new PatternMatcher())) {
+        if (fn.getCatalogVersion() <= fromVersion) continue;
+        TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
+            fn.getCatalogVersion());
+        function.setFn(fn.toThrift());
+        resp.addToUpdated_objects(function);
       }
-
-      // Get all roles
-      for (Role role: authPolicy_.getAllRoles()) {
+    }
+    // process data sources
+    for (DataSource dataSource: getDataSources()) {
+      if (dataSource.getCatalogVersion() <= fromVersion) continue;
+      TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
+          dataSource.getCatalogVersion());
+      catalogObj.setData_source(dataSource.toThrift());
+      resp.addToUpdated_objects(catalogObj);
+    }
+    // process cache pools
+    for (HdfsCachePool cachePool: hdfsCachePools_) {
+      if (cachePool.getCatalogVersion() <= fromVersion) continue;
+      TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+          cachePool.getCatalogVersion());
+      pool.setCache_pool(cachePool.toThrift());
+      resp.addToUpdated_objects(pool);
+    }
+    // process roles and privileges
+    for (Role role: authPolicy_.getAllRoles()) {
+      if (role.getCatalogVersion() > fromVersion) {
         TCatalogObject thriftRole = new TCatalogObject();
         thriftRole.setRole(role.toThrift());
         thriftRole.setCatalog_version(role.getCatalogVersion());
         thriftRole.setType(role.getCatalogObjectType());
-        resp.addToObjects(thriftRole);
-
-        for (RolePrivilege p: role.getPrivileges()) {
-          TCatalogObject privilege = new TCatalogObject();
-          privilege.setPrivilege(p.toThrift());
-          privilege.setCatalog_version(p.getCatalogVersion());
-          privilege.setType(p.getCatalogObjectType());
-          resp.addToObjects(privilege);
-        }
+        resp.addToUpdated_objects(thriftRole);
       }
 
-      // Each update should contain a single "TCatalog" object which is used to
-      // pass overall state on the catalog, such as the current version and the
-      // catalog service id.
-      TCatalogObject catalog = new TCatalogObject();
-      catalog.setType(TCatalogObjectType.CATALOG);
-      // By setting the catalog version to the latest catalog version at this point,
-      // it ensure impalads will always bump their versions, even in the case where
-      // an object has been dropped.
-      catalog.setCatalog_version(getCatalogVersion());
-      catalog.setCatalog(new TCatalog(catalogServiceId_));
-      resp.addToObjects(catalog);
+      for (RolePrivilege p: role.getPrivileges()) {
+        if (p.getCatalogVersion() <= fromVersion) continue;
+        TCatalogObject privilege = new TCatalogObject();
+        privilege.setPrivilege(p.toThrift());
+        privilege.setCatalog_version(p.getCatalogVersion());
+        privilege.setType(p.getCatalogObjectType());
+        resp.addToUpdated_objects(privilege);
+      }
+    }
 
-      // The max version is the max catalog version of all items in the update.
-      resp.setMax_catalog_version(getCatalogVersion());
-      return resp;
-    } finally {
-      catalogLock_.readLock().unlock();
+    Set<String> updatedCatalogObjects = Sets.newHashSet();
+    for (TCatalogObject catalogObj: resp.updated_objects) {
+      updatedCatalogObjects.add(CatalogDeltaLog.toCatalogObjectKey(catalogObj));
+    }
+
+    // Identify the catalog objects that were removed from the catalog for which the
+    // version is > 'fromVersion'. We need to make sure that we don't include "deleted"
+    // objects that were re-added to the catalog.
+    for (TCatalogObject removedObject: deleteLog_.retrieveObjects(fromVersion)) {
+      if (!updatedCatalogObjects.contains(CatalogDeltaLog.toCatalogObjectKey(
+          removedObject))) {
+        resp.addToDeleted_objects(removedObject);
+      }
     }
+    return resp;
   }
 
   /**
@@ -710,6 +743,40 @@ public class CatalogServiceCatalog extends Catalog {
           tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
         }
       }
+
+      if (existingDb != null) {
+        // Identify any removed functions and add them to the delta log.
+        for (Map.Entry<String, List<Function>> e:
+             existingDb.getAllFunctions().entrySet()) {
+          for (Function fn: e.getValue()) {
+            if (newDb.getFunction(fn,
+                Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
+              fn.setCatalogVersion(incrementAndGetCatalogVersion());
+              TCatalogObject removedObject =
+                  new TCatalogObject(TCatalogObjectType.FUNCTION, fn.getCatalogVersion());
+              removedObject.setFn(fn.toThrift());
+              deleteLog_.addRemovedObject(removedObject);
+            }
+          }
+        }
+
+        // Identify any deleted tables and add them to the delta log
+        Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
+        Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
+        oldTableNames.removeAll(newTableNames);
+        for (String removedTableName: oldTableNames) {
+          Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
+              removedTableName);
+          removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
+          TCatalogObject removedObject =
+              new TCatalogObject(TCatalogObjectType.TABLE,
+                  removedTable.getCatalogVersion());
+          removedObject.setTable(new TTable());
+          removedObject.getTable().setDb_name(existingDb.getName());
+          removedObject.getTable().setTbl_name(removedTableName);
+          deleteLog_.addRemovedObject(removedObject);
+        }
+      }
       return Pair.create(newDb, tblsToBackgroundLoad);
     } catch (Exception e) {
       LOG.warn("Encountered an exception while invalidating database: " + dbName +
@@ -757,6 +824,22 @@ public class CatalogServiceCatalog extends Catalog {
         }
       }
       dbCache_.set(newDbCache);
+
+      // Identify any deleted databases and add them to the delta log.
+      Set<String> oldDbNames = oldDbCache.keySet();
+      Set<String> newDbNames = newDbCache.keySet();
+      oldDbNames.removeAll(newDbNames);
+      for (String dbName: oldDbNames) {
+        Db removedDb = oldDbCache.get(dbName);
+        Preconditions.checkNotNull(removedDb);
+        removedDb.setCatalogVersion(
+            CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
+        TCatalogObject removedObject = new TCatalogObject(TCatalogObjectType.DATABASE,
+            removedDb.getCatalogVersion());
+        removedObject.setDb(removedDb.toThrift());
+        deleteLog_.addRemovedObject(removedObject);
+      }
+
       // Submit tables for background loading.
       for (TTableName tblName: tblsToBackgroundLoad) {
         tableLoadingMgr_.backgroundLoad(tblName);
@@ -1359,4 +1442,6 @@ public class CatalogServiceCatalog extends Catalog {
       tbl.getLock().unlock();
     }
   }
+
+  public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 4c959b2..70c9a61 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -177,7 +177,7 @@ public class ImpaladCatalog extends Catalog {
     // its child tables/functions is fine. If that happens, the removal of the child
     // object will be a no-op.
     for (TCatalogObject catalogObject: req.getRemoved_objects()) {
-      removeCatalogObject(catalogObject, newCatalogVersion);
+      removeCatalogObject(catalogObject);
     }
     lastSyncedCatalogVersion_ = newCatalogVersion;
     // Cleanup old entries in the log.
@@ -319,24 +319,10 @@ public class ImpaladCatalog extends Catalog {
   /**
    *  Removes the matching TCatalogObject from the catalog, if one exists and its
    *  catalog version is < the catalog version of this drop operation.
-   *  Note that drop operations that come from statestore heartbeats always have a
-   *  version of 0. To determine the drop version for statestore updates,
-   *  the catalog version from the current update is used. This is okay because there
-   *  can never be a catalog update from the statestore that contains a drop
-   *  and an addition of the same object. For more details on how drop
-   *  versioning works, see CatalogServerCatalog.java
    */
-  private void removeCatalogObject(TCatalogObject catalogObject,
-      long currentCatalogUpdateVersion) {
-    // The TCatalogObject associated with a drop operation from a state store
-    // heartbeat will always have a version of zero. Because no update from
-    // the state store can contain both a drop and an addition of the same object,
-    // we can assume the drop version is the current catalog version of this update.
-    // If the TCatalogObject contains a version that != 0, it indicates the drop
-    // came from a direct update.
-    long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
-        currentCatalogUpdateVersion : catalogObject.getCatalog_version();
-
+  private void removeCatalogObject(TCatalogObject catalogObject) {
+    Preconditions.checkState(catalogObject.getCatalog_version() != 0);
+    long dropCatalogVersion = catalogObject.getCatalog_version();
     switch(catalogObject.getType()) {
       case DATABASE:
         removeDb(catalogObject.getDb(), dropCatalogVersion);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index edba72c..da2c931 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1084,6 +1084,7 @@ public class CatalogOpExecutor {
     removedObject.setCatalog_version(dataSource.getCatalogVersion());
     resp.result.addToRemoved_catalog_objects(removedObject);
     resp.result.setVersion(dataSource.getCatalogVersion());
+    catalog_.getDeleteLog().addRemovedObject(removedObject);
   }
 
   /**
@@ -1266,6 +1267,7 @@ public class CatalogOpExecutor {
     removedObject.getDb().setDb_name(params.getDb());
     resp.result.setVersion(removedObject.getCatalog_version());
     resp.result.addToRemoved_catalog_objects(removedObject);
+    catalog_.getDeleteLog().addRemovedObject(removedObject);
   }
 
   /**
@@ -1387,6 +1389,7 @@ public class CatalogOpExecutor {
     removedObject.getTable().setTbl_name(tableName.getTbl());
     removedObject.getTable().setDb_name(tableName.getDb());
     removedObject.setCatalog_version(resp.result.getVersion());
+    catalog_.getDeleteLog().addRemovedObject(removedObject);
     resp.result.addToRemoved_catalog_objects(removedObject);
   }
 
@@ -1514,6 +1517,9 @@ public class CatalogOpExecutor {
 
       if (!removedFunctions.isEmpty()) {
         resp.result.setRemoved_catalog_objects(removedFunctions);
+        for (TCatalogObject removedFnObject: removedFunctions) {
+          catalog_.getDeleteLog().addRemovedObject(removedFnObject);
+        }
       }
       resp.result.setVersion(catalog_.getCatalogVersion());
     }
@@ -2213,6 +2219,7 @@ public class CatalogOpExecutor {
     removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
     removedObject.setCatalog_version(addedObject.getCatalog_version());
     response.result.addToRemoved_catalog_objects(removedObject);
+    catalog_.getDeleteLog().addRemovedObject(removedObject);
     response.result.addToUpdated_catalog_objects(addedObject);
     response.result.setVersion(addedObject.getCatalog_version());
   }
@@ -2813,6 +2820,7 @@ public class CatalogOpExecutor {
     catalogObject.setCatalog_version(role.getCatalogVersion());
     if (createDropRoleParams.isIs_drop()) {
       resp.result.addToRemoved_catalog_objects(catalogObject);
+      catalog_.getDeleteLog().addRemovedObject(catalogObject);
     } else {
       resp.result.addToUpdated_catalog_objects(catalogObject);
     }
@@ -2875,6 +2883,9 @@ public class CatalogOpExecutor {
       catalogObject.setPrivilege(rolePriv.toThrift());
       catalogObject.setCatalog_version(rolePriv.getCatalogVersion());
       updatedPrivs.add(catalogObject);
+      if (!grantRevokePrivParams.isIs_grant() && !privileges.get(0).isHas_grant_opt()) {
+        catalog_.getDeleteLog().addRemovedObject(catalogObject);
+      }
     }
 
     // TODO: Currently we only support sending back 1 catalog object in a "direct DDL"
@@ -3047,6 +3058,9 @@ public class CatalogOpExecutor {
           resp.result.setUpdated_catalog_objects(addedFuncs);
           resp.result.setRemoved_catalog_objects(removedFuncs);
           resp.result.setVersion(catalog_.getCatalogVersion());
+          for (TCatalogObject removedFn: removedFuncs) {
+            catalog_.getDeleteLog().addRemovedObject(removedFn);
+          }
         }
       }
     } else if (req.isSetTable_name()) {
@@ -3084,6 +3098,7 @@ public class CatalogOpExecutor {
         // processed as a direct DDL operation.
         if (tblWasRemoved.getRef()) {
           resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
+          catalog_.getDeleteLog().addRemovedObject(updatedThriftTable);
         } else {
           resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b756230..f8f419a 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -38,7 +38,8 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetCatalogDeltaRequest;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -117,9 +118,11 @@ public class JniCatalog {
   /**
    * Gets all catalog objects
    */
-  public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException {
-    TGetAllCatalogObjectsResponse resp =
-        catalog_.getCatalogObjects(from_version);
+  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
+      throws ImpalaException, TException {
+    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
+    JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
+    TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version());
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(resp);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd340b88/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index e2b1715..1003dc7 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -311,14 +311,12 @@ class StatestoreSubscriber(object):
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
-                        num_updates=1, deletions=None):
+                        num_updates=1):
     topic_entries = [
       Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
       for x in xrange(num_updates)]
-    if deletions is None: deletions = []
     return Subscriber.TTopicDelta(topic_name=topic_name,
                                   topic_entries=topic_entries,
-                                  topic_deletions=deletions,
                                   is_delta=False)
 
   def test_registration_ids_different(self):
@@ -349,11 +347,9 @@ class TestStatestore():
         assert len(args.topic_deltas) == 1
         assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
-        assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions
       elif sub.update_count == 3:
         # After the content-bearing update was processed, the next delta should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
-        assert len(args.topic_deltas[topic_name].topic_deletions) == 0
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
@@ -461,7 +457,7 @@ class TestStatestore():
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, see
         # IMPALA-1891
-        assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
+        assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),