You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/29 06:41:50 UTC

[1/7] impala git commit: IMPALA-5936: operator '%' overflows on large decimals

Repository: impala
Updated Branches:
  refs/heads/master d3afe3075 -> 63f17e9ce


IMPALA-5936: operator '%' overflows on large decimals

Suppose we have a large decimal number, which is greater
than INT_MAX. We want to calculate the modulo of this
number by 3:
BIG_DECIMAL % 3

The result of this calculation can be 0, 1, or 2.
This can fit into a decimal with precision 1.

The in-memory representation of such small decimals are
stored in int32_t in the backend. Let's call this int32_t
the result type. The backend had the invalid assumption
that it can do the calculation as well using the result type.
This assumption is true for multiplying or adding numbers,
but not for modulo.

Now the backend selects the biggest type of ['return type',
'1st operand type', '2nd operand type'] to do the calculation.

Change-Id: I2b06c8acd5aa490943e84013faf2eaac7c26ceb4
Reviewed-on: http://gerrit.cloudera.org:8080/8574
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4f11bed4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4f11bed4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4f11bed4

Branch: refs/heads/master
Commit: 4f11bed407e305a88ccb1b94d9fc2ce356830154
Parents: d3afe30
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Thu Nov 16 14:54:35 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Nov 28 21:11:45 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/decimal-operators-ir.cc                 | 10 +++++++++-
 .../queries/QueryTest/decimal-exprs.test             | 15 +++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4f11bed4/be/src/exprs/decimal-operators-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/decimal-operators-ir.cc b/be/src/exprs/decimal-operators-ir.cc
index 4af2366..0133317 100644
--- a/be/src/exprs/decimal-operators-ir.cc
+++ b/be/src/exprs/decimal-operators-ir.cc
@@ -673,6 +673,8 @@ BooleanVal DecimalOperators::CastToBooleanVal(
         ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); \
     switch (ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_SIZE)) { \
       case 4: { \
+        DCHECK_LE(x_size, 4); \
+        DCHECK_LE(y_size, 4); \
         Decimal4Value x_val = GetDecimal4Value(x, x_size, &overflow); \
         Decimal4Value y_val = GetDecimal4Value(y, y_size, &overflow); \
         Decimal4Value result = x_val.OP_FN<int32_t>(x_scale, y_val, y_scale, \
@@ -681,6 +683,8 @@ BooleanVal DecimalOperators::CastToBooleanVal(
         return DecimalVal(result.value()); \
       } \
       case 8: { \
+        DCHECK_LE(x_size, 8); \
+        DCHECK_LE(y_size, 8); \
         Decimal8Value x_val = GetDecimal8Value(x, x_size, &overflow); \
         Decimal8Value y_val = GetDecimal8Value(y, y_size, &overflow); \
         Decimal8Value result = x_val.OP_FN<int64_t>(x_scale, y_val, y_scale, \
@@ -712,13 +716,17 @@ BooleanVal DecimalOperators::CastToBooleanVal(
     int x_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SCALE, 0); \
     int y_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 1); \
     int y_scale = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SCALE, 1); \
+    int return_size = \
+        ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_SIZE); \
     int return_precision = \
         ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_PRECISION); \
     int return_scale = \
         ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_SCALE); \
     const bool decimal_v2 = \
         ctx->impl()->GetConstFnAttr(FunctionContextImpl::DECIMAL_V2); \
-    switch (ctx->impl()->GetConstFnAttr(FunctionContextImpl::RETURN_TYPE_SIZE)) { \
+    /* We need a type that is big enough for the result and the operands as well */ \
+    int max_size = ::max(::max(x_size, y_size), return_size); \
+    switch (max_size) { \
       case 4: { \
         Decimal4Value x_val = GetDecimal4Value(x, x_size, &overflow); \
         Decimal4Value y_val = GetDecimal4Value(y, y_size, &overflow); \

http://git-wip-us.apache.org/repos/asf/impala/blob/4f11bed4/testdata/workloads/functional-query/queries/QueryTest/decimal-exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/decimal-exprs.test b/testdata/workloads/functional-query/queries/QueryTest/decimal-exprs.test
index 9ffd78a..b34cb78 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/decimal-exprs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/decimal-exprs.test
@@ -298,3 +298,18 @@ select
 ---- TYPES
 DOUBLE,DOUBLE,DOUBLE
 ====
+---- QUERY
+# IMPALA-5936: big decimal numbers with % operator
+set decimal_v2=true;
+select
+cast(42607032167 as decimal(18, 0)) % 3,
+cast(42606774111 as decimal(18, 0)) % 3,
+cast(42363009429 as decimal(18, 0)) % 3,
+cast(42603003271 as decimal(18, 0)) % 3,
+cast(42606961501 as decimal(18, 0)) % 3,
+cast(42608445511 as decimal(18, 0)) % 3
+---- RESULTS
+2,0,0,1,1,1
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL,DECIMAL,DECIMAL,DECIMAL
+====


[3/7] impala git commit: Revert "IMPALA-5538: Use explicit catalog versions for deleted objects"

Posted by ta...@apache.org.
Revert "IMPALA-5538: Use explicit catalog versions for deleted objects"

This reverts commit dd340b8810ecd00ad2ffe79845ca137e941aefb7.
This commit caused a number of issues tracked in IMPALA-6001. The
issues were due to the lack of atomicity between the catalog version
change and the addition to the delete log of a catalog object.

Conflicts:
	be/src/service/impala-server.cc

Change-Id: I3a2cddee5d565384e9de0e61b3b7d0d9075e0dce
Reviewed-on: http://gerrit.cloudera.org:8080/8667
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a88c3b9c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a88c3b9c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a88c3b9c

Branch: refs/heads/master
Commit: a88c3b9c529b2215b232455598f8f8332c27f996
Parents: 0588309
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Tue Nov 28 09:58:25 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 29 02:19:50 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  57 ++--
 be/src/catalog/catalog-server.h                 |  39 ++-
 be/src/catalog/catalog-util.cc                  |  15 +
 be/src/catalog/catalog-util.h                   |   9 +
 be/src/catalog/catalog.cc                       |  19 +-
 be/src/catalog/catalog.h                        |  14 +-
 be/src/scheduling/admission-controller.cc       |  18 +-
 be/src/scheduling/admission-controller.h        |   7 +-
 be/src/scheduling/scheduler-test-util.cc        |   5 +-
 be/src/scheduling/scheduler.cc                  |  22 +-
 be/src/service/impala-server.cc                 | 121 ++++----
 be/src/statestore/statestore.cc                 |  56 ++--
 be/src/statestore/statestore.h                  |  43 ++-
 common/thrift/CatalogInternalService.thrift     |  24 +-
 common/thrift/StatestoreService.thrift          |  31 +--
 .../apache/impala/catalog/CatalogDeltaLog.java  | 104 +++----
 .../impala/catalog/CatalogServiceCatalog.java   | 275 +++++++------------
 .../apache/impala/catalog/ImpaladCatalog.java   |  22 +-
 .../impala/service/CatalogOpExecutor.java       |  15 -
 .../org/apache/impala/service/JniCatalog.java   |  11 +-
 tests/statestore/test_statestore.py             |   8 +-
 21 files changed, 438 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b004b22..15685d0 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -228,10 +228,13 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   const TTopicDelta& delta = topic->second;
 
-  // If not generating a delta update and 'pending_topic_updates_' doesn't already contain
-  // the full catalog (beginning with version 0), then force GatherCatalogUpdatesThread()
-  // to reload the full catalog.
-  if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
+  // If this is not a delta update, clear all catalog objects and request an update
+  // from version 0 from the local catalog. There is an optimization that checks if
+  // pending_topic_updates_ was just reloaded from version 0, if they have then skip this
+  // step and use that data.
+  if (delta.from_version == 0 && delta.to_version == 0 &&
+      catalog_objects_min_version_ != 0) {
+    catalog_topic_entry_keys_.clear();
     last_sent_catalog_version_ = 0L;
   } else {
     // Process the pending topic update.
@@ -281,17 +284,14 @@ void CatalogServer::UpdateCatalogTopicCallback(
     } else if (current_catalog_version != last_sent_catalog_version_) {
       // If there has been a change since the last time the catalog was queried,
       // call into the Catalog to find out what has changed.
-      TGetCatalogDeltaResponse catalog_objects;
-      status = catalog_->GetCatalogDelta(last_sent_catalog_version_, &catalog_objects);
+      TGetAllCatalogObjectsResponse catalog_objects;
+      status = catalog_->GetAllCatalogObjects(last_sent_catalog_version_,
+          &catalog_objects);
       if (!status.ok()) {
         LOG(ERROR) << status.GetDetail();
       } else {
-        // Use the catalog objects to build a topic update list. These include
-        // objects added to the catalog, 'updated_objects', and objects deleted
-        // from the catalog, 'deleted_objects'. The order in which we process
-        // these two disjoint sets of catalog objects does not matter.
-        BuildTopicUpdates(catalog_objects.updated_objects, false);
-        BuildTopicUpdates(catalog_objects.deleted_objects, true);
+        // Use the catalog objects to build a topic update list.
+        BuildTopicUpdates(catalog_objects.objects);
         catalog_objects_min_version_ = last_sent_catalog_version_;
         catalog_objects_max_version_ = catalog_objects.max_catalog_version;
       }
@@ -302,19 +302,31 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
-void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects,
-    bool topic_deletions) {
+void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) {
+  unordered_set<string> current_entry_keys;
+  // Add any new/updated catalog objects to the topic.
   for (const TCatalogObject& catalog_object: catalog_objects) {
-    DCHECK_GT(catalog_object.catalog_version, last_sent_catalog_version_);
     const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
     if (entry_key.empty()) {
       LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
                                << ThriftDebugString(catalog_object);
     }
+
+    current_entry_keys.insert(entry_key);
+    // Remove this entry from catalog_topic_entry_keys_. At the end of this loop, we will
+    // be left with the set of keys that were in the last update, but not in this
+    // update, indicating which objects have been removed/dropped.
+    catalog_topic_entry_keys_.erase(entry_key);
+
+    // This isn't a new or an updated item, skip it.
+    if (catalog_object.catalog_version <= last_sent_catalog_version_) continue;
+
+    VLOG(1) << "Publishing update: " << entry_key << "@"
+            << catalog_object.catalog_version;
+
     pending_topic_updates_.push_back(TTopicItem());
     TTopicItem& item = pending_topic_updates_.back();
     item.key = entry_key;
-    item.deleted = topic_deletions;
     Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
     if (!status.ok()) {
       LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
@@ -328,9 +340,18 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
         pending_topic_updates_.pop_back();
       }
     }
-    VLOG(1) << "Publishing " << (topic_deletions ? "deletion " : "update ")
-        << ": " << entry_key << "@" << catalog_object.catalog_version;
   }
+
+  // Any remaining items in catalog_topic_entry_keys_ indicate the object was removed
+  // since the last update.
+  for (const string& key: catalog_topic_entry_keys_) {
+    pending_topic_updates_.push_back(TTopicItem());
+    TTopicItem& item = pending_topic_updates_.back();
+    item.key = key;
+    VLOG(1) << "Publishing deletion: " << key;
+    // Don't set a value to mark this item as deleted.
+  }
+  catalog_topic_entry_keys_.swap(current_entry_keys);
 }
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 78a3f20..bf88e00 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -88,6 +88,12 @@ class CatalogServer {
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
+  /// Tracks the set of catalog objects that exist via their topic entry key.
+  /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects
+  /// that exist by their topic entry key. Used to track objects that have been removed
+  /// since the last heartbeat.
+  boost::unordered_set<std::string> catalog_topic_entry_keys_;
+
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -129,10 +135,14 @@ class CatalogServer {
   /// finds all catalog objects that have a catalog version greater than the last update
   /// sent by calling into the JniCatalog. The topic is updated with any catalog objects
   /// that are new or have been modified since the last heartbeat (by comparing the
-  /// catalog version of the object with last_sent_catalog_version_). At the end of
-  /// execution it notifies the catalog_update_gathering_thread_ to fetch the next set of
-  /// updates from the JniCatalog. All updates are added to the subscriber_topic_updates
-  /// list and sent back to the Statestore.
+  /// catalog version of the object with last_sent_catalog_version_). Also determines any
+  /// deletions of catalog objects by looking at the
+  /// difference of the last set of topic entry keys that were sent and the current set
+  /// of topic entry keys. At the end of execution it notifies the
+  /// catalog_update_gathering_thread_ to fetch the next set of updates from the
+  /// JniCatalog.
+  /// All updates are added to the subscriber_topic_updates list and sent back to the
+  /// Statestore.
   void UpdateCatalogTopicCallback(
       const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
       std::vector<TTopicDelta>* subscriber_topic_updates);
@@ -143,19 +153,20 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
-  /// Builds the next topic update to send based on what items
-  /// have been added/changed/removed from the catalog since the last hearbeat. To do
-  /// this, it enumerates the given catalog objects returned looking for the objects that
-  /// have a catalog version that is > the catalog version sent with the last heartbeat.
-  /// 'topic_deletions' is true if 'catalog_objects' contain deleted catalog
-  /// objects.
-  ///
+  /// This function determines what items have been added/removed from the catalog
+  /// since the last heartbeat and builds the next topic update to send. To do this, it
+  /// enumerates the given catalog objects returned looking for the objects that have a
+  /// catalog version that is > the catalog version sent with the last heartbeat. To
+  /// determine items that have been deleted, it saves the set of topic entry keys sent
+  /// with the last update and looks at the difference between it and the current set of
+  /// topic entry keys.
   /// The key for each entry is a string composed of:
   /// "TCatalogObjectType:<unique object name>". So for table foo.bar, the key would be
   /// "TABLE:foo.bar". Encoding the object type information in the key ensures the keys
-  /// are unique. Must hold catalog_lock_ when calling this function.
-  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects,
-      bool topic_deletions);
+  /// are unique, as well as helps to determine what object type was removed in a state
+  /// store delta update (since the state store only sends key names for deleted items).
+  /// Must hold catalog_lock_ when calling this function.
+  void BuildTopicUpdates(const std::vector<TCatalogObject>& catalog_objects);
 
   /// Example output:
   /// "databases": [

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index 7b115b0..de4f2fd 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -55,6 +55,21 @@ TCatalogObjectType::type TCatalogObjectTypeFromName(const string& name) {
   return TCatalogObjectType::UNKNOWN;
 }
 
+Status TCatalogObjectFromEntryKey(const string& key,
+    TCatalogObject* catalog_object) {
+  // Reconstruct the object based only on the key.
+  size_t pos = key.find(":");
+  if (pos == string::npos || pos >= key.size() - 1) {
+    stringstream error_msg;
+    error_msg << "Invalid topic entry key format: " << key;
+    return Status(error_msg.str());
+  }
+
+  TCatalogObjectType::type object_type = TCatalogObjectTypeFromName(key.substr(0, pos));
+  string object_name = key.substr(pos + 1);
+  return TCatalogObjectFromObjectName(object_type, object_name, catalog_object);
+}
+
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
     const string& object_name, TCatalogObject* catalog_object) {
   switch (object_type) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog-util.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.h b/be/src/catalog/catalog-util.h
index e98cd38..ddc2c21 100644
--- a/be/src/catalog/catalog-util.h
+++ b/be/src/catalog/catalog-util.h
@@ -32,6 +32,15 @@ class Status;
 /// TCatalogObjectType::UNKNOWN if no match was found.
 TCatalogObjectType::type TCatalogObjectTypeFromName(const std::string& name);
 
+/// Parses the given IMPALA_CATALOG_TOPIC topic entry key to determine the
+/// TCatalogObjectType and unique object name. Populates catalog_object with the result.
+/// This is used to reconstruct type information when an item is deleted from the
+/// topic. At that time the only context available about the object being deleted is its
+/// its topic entry key which contains only the type and object name. The resulting
+/// TCatalogObject can then be used to removing a matching item from the catalog.
+Status TCatalogObjectFromEntryKey(const std::string& key,
+    TCatalogObject* catalog_object);
+
 /// Populates a TCatalogObject based on the given object type (TABLE, DATABASE, etc) and
 /// object name string.
 Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index b6dd86a..e7e05da 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -62,7 +62,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
-    {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
+    {"getCatalogObjects", "(J)[B", &get_catalog_objects_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -97,10 +97,19 @@ Status Catalog::GetCatalogVersion(long* version) {
   return Status::OK();
 }
 
-Status Catalog::GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp) {
-  TGetCatalogDeltaRequest request;
-  request.__set_from_version(from_version);
-  return JniUtil::CallJniMethod(catalog_, get_catalog_delta_id_, request, resp);
+Status Catalog::GetAllCatalogObjects(long from_version,
+    TGetAllCatalogObjectsResponse* resp) {
+  JNIEnv* jni_env = getJNIEnv();
+  JniLocalFrame jni_frame;
+  RETURN_IF_ERROR(jni_frame.push(jni_env));
+  jvalue requested_from_version;
+  requested_from_version.j = from_version;
+  jbyteArray result_bytes = static_cast<jbyteArray>(
+      jni_env->CallObjectMethod(catalog_, get_catalog_objects_id_,
+      requested_from_version));
+  RETURN_ERROR_IF_EXC(jni_env);
+  RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, resp));
+  return Status::OK();
 }
 
 Status Catalog::ExecDdl(const TDdlExecRequest& req, TDdlExecResponse* resp) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 3119d60..ab6a2a3 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -56,10 +56,12 @@ class Catalog {
   /// Status object with information on the error will be returned.
   Status GetCatalogVersion(long* version);
 
-  /// Retrieves the catalog objects that were added/modified/deleted since version
-  /// 'from_version'. Returns OK if the operation was successful, otherwise a Status
-  /// object with information on the error will be returned.
-  Status GetCatalogDelta(long from_version, TGetCatalogDeltaResponse* resp);
+  /// Gets all Catalog objects and the metadata that is applicable for the given request.
+  /// Always returns all object names that exist in the Catalog, but allows for extended
+  /// metadata for objects that were modified after the specified version.
+  /// Returns OK if the operation was successful, otherwise a Status object with
+  /// information on the error will be returned.
+  Status GetAllCatalogObjects(long from_version, TGetAllCatalogObjectsResponse* resp);
 
   /// Gets the Thrift representation of a Catalog object. The request is a TCatalogObject
   /// which has the desired TCatalogObjectType and name properly set.
@@ -72,7 +74,7 @@ class Catalog {
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
   /// and each pN may contain wildcards denoted by '*' which match all strings.
   /// TODO: GetDbs() and GetTableNames() can probably be scrapped in favor of
-  /// GetCatalogDelta(). Consider removing them and moving everything to use
+  /// GetAllCatalogObjects(). Consider removing them and moving everything to use
   /// that.
   Status GetDbs(const std::string* pattern, TGetDbsResult* dbs);
 
@@ -107,7 +109,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
-  jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
+  jmethodID get_catalog_objects_id_;  // JniCatalog.getCatalogObjects()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 99f659a..ed4e7e2 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -645,6 +645,7 @@ void AdmissionController::UpdatePoolStats(
         }
       }
       HandleTopicUpdates(delta.topic_entries);
+      HandleTopicDeletions(delta.topic_deletions);
     }
     UpdateClusterAggregates();
   }
@@ -682,10 +683,6 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
     // The topic entry from this subscriber is handled specially; the stats coming
     // from the statestore are likely already outdated.
     if (topic_backend_id == host_id_) continue;
-    if (item.deleted) {
-      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-      continue;
-    }
     TPoolStats remote_update;
     uint32_t len = item.value.size();
     Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -698,7 +695,18 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd
   }
 }
 
-void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reserved) {
+void AdmissionController::HandleTopicDeletions(const vector<string>& topic_deletions) {
+  for (const string& topic_key: topic_deletions) {
+    string pool_name;
+    string topic_backend_id;
+    if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
+    if (topic_backend_id == host_id_) continue;
+    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+  }
+}
+
+void AdmissionController::PoolStats::UpdateAggregates(
+    HostMemMap* host_mem_reserved) {
   const string& coord_id = parent_->host_id_;
   int64_t num_running = 0;
   int64_t num_queued = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 2830bee..71c9fa4 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -428,10 +428,13 @@ class AdmissionController {
   void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
-  /// Removes remote stats identified by topic deletions coming from the
-  /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
   void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
 
+  /// Removes remote stats identified by the topic_deletions coming from the statestore.
+  /// Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
+  void HandleTopicDeletions(const std::vector<std::string>& topic_deletions);
+
   /// Re-computes the per-pool aggregate stats and the per-host aggregates in
   /// host_mem_reserved_ using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 05cfc42..6fb2bba 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -478,10 +478,7 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
   TTopicDelta delta;
   delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
-  TTopicItem item;
-  item.__set_deleted(true);
-  item.__set_key(host.ip);
-  delta.topic_entries.push_back(item);
+  delta.topic_deletions.push_back(host.ip);
   SendTopicDelta(delta);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 5cf0f01..2bd6c96 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -131,7 +131,9 @@ void Scheduler::UpdateMembership(
 
   // If the delta transmitted by the statestore is empty we can skip processing
   // altogether and avoid making a copy of executors_config_.
-  if (delta.is_delta && delta.topic_entries.empty()) return;
+  if (delta.is_delta && delta.topic_entries.empty() && delta.topic_deletions.empty()) {
+    return;
+  }
 
   // This function needs to handle both delta and non-delta updates. To minimize the
   // time needed to hold locks, all updates are applied to a copy of
@@ -148,17 +150,10 @@ void Scheduler::UpdateMembership(
     new_executors_config = std::make_shared<BackendConfig>(*executors_config_);
   }
 
-  // Process new and removed entries to the topic. Update executors_config_ and
+  // Process new entries to the topic. Update executors_config_ and
   // current_executors_ to match the set of executors given by the
   // subscriber_topic_updates.
   for (const TTopicItem& item : delta.topic_entries) {
-    if (item.deleted) {
-      if (current_executors_.find(item.key) != current_executors_.end()) {
-        new_executors_config->RemoveBackend(current_executors_[item.key]);
-        current_executors_.erase(item.key);
-      }
-      continue;
-    }
     TBackendDescriptor be_desc;
     // Benchmarks have suggested that this method can deserialize
     // ~10m messages per second, so no immediate need to consider optimization.
@@ -193,6 +188,15 @@ void Scheduler::UpdateMembership(
       current_executors_.insert(make_pair(item.key, be_desc));
     }
   }
+
+  // Process deletions from the topic
+  for (const string& backend_id : delta.topic_deletions) {
+    if (current_executors_.find(backend_id) != current_executors_.end()) {
+      new_executors_config->RemoveBackend(current_executors_[backend_id]);
+      current_executors_.erase(backend_id);
+    }
+  }
+
   SetExecutorsConfig(new_executors_config);
 
   if (metrics_ != nullptr) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c065282..2222802 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1316,10 +1316,10 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
+
   // Update catalog cache in frontend. An update is split into batches of size
   // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
-  if (delta.topic_entries.size() != 0)  {
-    vector<TCatalogObject> dropped_objects;
+  if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
     vector<TUpdateCatalogCacheRequest> update_reqs;
     update_reqs.push_back(TUpdateCatalogCacheRequest());
     TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
@@ -1329,6 +1329,7 @@ void ImpalaServer::CatalogUpdateCallback(
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
     uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
+      TCatalogObject catalog_object;
       Status status;
       vector<uint8_t> data_buffer;
       const uint8_t* data_buffer_ptr = nullptr;
@@ -1346,70 +1347,82 @@ void ImpalaServer::CatalogUpdateCallback(
         data_buffer_ptr = reinterpret_cast<const uint8_t*>(item.value.data());
         len = item.value.size();
       }
-      if (len > 100 * 1024 * 1024 /* 100MB */) {
-        LOG(INFO) << "Received large catalog object(>100mb): "
-            << item.key << " is "
-            << PrettyPrinter::Print(len, TUnit::BYTES);
-      }
-      TCatalogObject catalog_object;
       status = DeserializeThriftMsg(data_buffer_ptr, &len, FLAGS_compact_catalog_topic,
           &catalog_object);
       if (!status.ok()) {
         LOG(ERROR) << "Error deserializing item " << item.key
-            << ": " << status.GetDetail();
+                   << ": " << status.GetDetail();
         continue;
       }
+      if (len > 100 * 1024 * 1024 /* 100MB */) {
+        LOG(INFO) << "Received large catalog update(>100mb): "
+                     << item.key << " is "
+                     << PrettyPrinter::Print(len, TUnit::BYTES);
+      }
+      if (catalog_object.type == TCatalogObjectType::CATALOG) {
+        incremental_request->__set_catalog_service_id(
+            catalog_object.catalog.catalog_service_id);
+        new_catalog_version = catalog_object.catalog_version;
+      }
+
+      // Refresh the lib cache entries of any added functions and data sources
+      // TODO: if frontend returns the list of functions and data sources, we do not
+      // need to deserialize these in backend.
+      if (catalog_object.type == TCatalogObjectType::FUNCTION) {
+        DCHECK(catalog_object.__isset.fn);
+        LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
+      }
+      if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+        DCHECK(catalog_object.__isset.data_source);
+        LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
+      }
 
       if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
-        // Initialize a new batch of catalog updates.
         update_reqs.push_back(TUpdateCatalogCacheRequest());
         incremental_request = &update_reqs.back();
         batch_size_bytes = 0;
       }
+      incremental_request->updated_objects.push_back(catalog_object);
+      batch_size_bytes += len;
+    }
+    update_reqs.push_back(TUpdateCatalogCacheRequest());
+    TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back();
 
-      if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        incremental_request->__set_catalog_service_id(
-            catalog_object.catalog.catalog_service_id);
-        new_catalog_version = catalog_object.catalog_version;
+    // We need to look up the dropped functions and data sources and remove them
+    // from the library cache. The data sent from the catalog service does not
+    // contain all the function metadata so we'll ask our local frontend for it. We
+    // need to do this before updating the catalog.
+    vector<TCatalogObject> dropped_objects;
+
+    // Process all Catalog deletions (dropped objects). We only know the keys (object
+    // names) so must parse each key to determine the TCatalogObject.
+    for (const string& key: delta.topic_deletions) {
+      LOG(INFO) << "Catalog topic entry deletion: " << key;
+      TCatalogObject catalog_object;
+      Status status = TCatalogObjectFromEntryKey(key, &catalog_object);
+      if (!status.ok()) {
+        LOG(ERROR) << "Error parsing catalog topic entry deletion key: " << key << " "
+                   << "Error: " << status.GetDetail();
+        continue;
       }
-      VLOG(1) << (item.deleted ? "Deleted " : "Added ") << "item: " << item.key
-          << " version: " << catalog_object.catalog_version << " of size: " << len;
-
-      if (!item.deleted) {
-        // Refresh the lib cache entries of any added functions and data sources
-        // TODO: if frontend returns the list of functions and data sources, we do not
-        // need to deserialize these in backend.
-        if (catalog_object.type == TCatalogObjectType::FUNCTION) {
-          DCHECK(catalog_object.__isset.fn);
-          LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
-        }
-        if (catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-          DCHECK(catalog_object.__isset.data_source);
-          LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
-        }
-        incremental_request->updated_objects.push_back(catalog_object);
-      } else {
-        // We need to look up any dropped functions and data sources and remove
-        // them from the library cache.
-        if (catalog_object.type == TCatalogObjectType::FUNCTION ||
-            catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
-          TCatalogObject existing_object;
-          if (exec_env_->frontend()->GetCatalogObject(
-              catalog_object, &existing_object).ok()) {
-            // If the object exists in the catalog it may have been dropped and
-            // re-created. To avoid removing the re-created object's entry from
-            // the cache verify that the existing object's version <= the
-            // version of the dropped object included in this statestore
-            // heartbeat.
-            DCHECK_NE(existing_object.catalog_version, catalog_object.catalog_version);
-            if (existing_object.catalog_version < catalog_object.catalog_version) {
-              dropped_objects.push_back(existing_object);
+      deletion_request->removed_objects.push_back(catalog_object);
+      if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+          catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+        TCatalogObject dropped_object;
+        if (exec_env_->frontend()->GetCatalogObject(
+                catalog_object, &dropped_object).ok()) {
+          // This object may have been dropped and re-created. To avoid removing the
+          // re-created object's entry from the cache verify the existing object has a
+          // catalog version <= the catalog version included in this statestore heartbeat.
+          if (dropped_object.catalog_version <= new_catalog_version) {
+            if (catalog_object.type == TCatalogObjectType::FUNCTION ||
+                catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
+              dropped_objects.push_back(dropped_object);
             }
           }
         }
-        incremental_request->removed_objects.push_back(catalog_object);
+        // Nothing to do in error case.
       }
-      batch_size_bytes += len;
     }
 
     // Call the FE to apply the changes to the Impalad Catalog.
@@ -1530,10 +1543,6 @@ void ImpalaServer::MembershipCallback(
 
     // Process membership additions.
     for (const TTopicItem& item: delta.topic_entries) {
-      if (item.deleted) {
-        known_backends_.erase(item.key);
-        continue;
-      }
       uint32_t len = item.value.size();
       TBackendDescriptor backend_descriptor;
       Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
@@ -1550,12 +1559,18 @@ void ImpalaServer::MembershipCallback(
     // Only register if all ports have been opened and are ready.
     if (services_started_.load()) AddLocalBackendToStatestore(subscriber_topic_updates);
 
+    // Process membership deletions.
+    for (const string& backend_id: delta.topic_deletions) {
+      known_backends_.erase(backend_id);
+    }
+
     // Create a set of known backend network addresses. Used to test for cluster
     // membership by network address.
     set<TNetworkAddress> current_membership;
     // Also reflect changes to the frontend. Initialized only if any_changes is true.
     TUpdateMembershipRequest update_req;
-    bool any_changes = !delta.topic_entries.empty() || !delta.is_delta;
+    bool any_changes = !delta.topic_entries.empty() || !delta.topic_deletions.empty() ||
+        !delta.is_delta;
     for (const BackendDescriptorMap::value_type& backend: known_backends_) {
       current_membership.insert(backend.second.address);
       if (any_changes) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index d0a4851..2d93c5f 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -85,6 +85,8 @@ const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-by
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
+const Statestore::TopicEntry::Value Statestore::TopicEntry::NULL_VALUE = "";
+
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
@@ -122,13 +124,13 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
 void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
     TopicEntry::Version version) {
-  DCHECK_GT(bytes.size(), 0);
+  DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
   value_ = bytes;
   version_ = version;
 }
 
 Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
+    const Statestore::TopicEntry::Value& bytes) {
   TopicEntryMap::iterator entry_it = entries_.find(key);
   int64_t key_size_delta = 0;
   int64_t value_size_delta = 0;
@@ -145,7 +147,6 @@ Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
   value_size_delta += bytes.size();
 
   entry_it->second.SetValue(bytes, ++last_version_);
-  entry_it->second.SetDeleted(is_deleted);
   topic_update_log_.insert(make_pair(entry_it->second.version(), key));
 
   total_key_size_bytes_ += key_size_delta;
@@ -167,10 +168,12 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     // entry
     topic_update_log_.erase(version);
     topic_update_log_.insert(make_pair(++last_version_, key));
+    total_value_size_bytes_ -= entry_it->second.value().size();
+    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
+
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
-    entry_it->second.SetDeleted(true);
-    entry_it->second.SetVersion(last_version_);
+    entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
   }
 }
 
@@ -464,9 +467,11 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // At this point the updates are assumed to have been successfully processed by the
   // subscriber. Update the subscriber's max version of each topic.
-  for (const auto& topic_delta: update_state_request.topic_deltas) {
-    subscriber->SetLastTopicVersionProcessed(topic_delta.first,
-        topic_delta.second.to_version);
+  map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
+      update_state_request.topic_deltas.begin();
+  for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
+    subscriber->SetLastTopicVersionProcessed(topic_delta->first,
+        topic_delta->second.to_version);
   }
 
   // Thirdly: perform any / all updates returned by the subscriber
@@ -495,8 +500,14 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
       Topic* topic = &topic_it->second;
       for (const TTopicItem& item: update.topic_entries) {
-        subscriber->AddTransientUpdate(update.topic_name, item.key,
-            topic->Put(item.key, item.value, item.deleted));
+        TopicEntry::Version version = topic->Put(item.key, item.value);
+        subscriber->AddTransientUpdate(update.topic_name, item.key, version);
+      }
+
+      for (const string& key: update.topic_deletions) {
+        TopicEntry::Version version =
+            topic->Put(key, Statestore::TopicEntry::NULL_VALUE);
+        subscriber->AddTransientUpdate(update.topic_name, key, version);
       }
     }
   }
@@ -530,25 +541,30 @@ void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
       TopicUpdateLog::const_iterator next_update =
           topic.topic_update_log().upper_bound(last_processed_version);
 
-      uint64_t topic_size = 0;
+      int64_t deleted_key_size_bytes = 0;
       for (; next_update != topic.topic_update_log().end(); ++next_update) {
         TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
         DCHECK(itr != topic.entries().end());
         const TopicEntry& topic_entry = itr->second;
-        // Don't send deleted entries for non-delta updates.
-        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
-          continue;
+        if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
+          if (!topic_delta.is_delta) {
+            deleted_key_size_bytes += itr->first.size();
+            continue;
+          }
+          topic_delta.topic_deletions.push_back(itr->first);
+        } else {
+          topic_delta.topic_entries.push_back(TTopicItem());
+          TTopicItem& topic_item = topic_delta.topic_entries.back();
+          topic_item.key = itr->first;
+          // TODO: Does this do a needless copy?
+          topic_item.value = topic_entry.value();
         }
-        topic_delta.topic_entries.push_back(TTopicItem());
-        TTopicItem& topic_item = topic_delta.topic_entries.back();
-        topic_item.key = itr->first;
-        topic_item.value = topic_entry.value();
-        topic_item.deleted = topic_entry.is_deleted();
-        topic_size += topic_item.key.size() + topic_item.value.size();
       }
 
       if (!topic_delta.is_delta &&
           topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
+        int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes
+            + topic.total_value_size_bytes();
         VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
                    << " topic update for " << subscriber.id() << ". Size = "
                    << PrettyPrinter::Print(topic_size, TUnit::BYTES);

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 38b8361..1488f7e 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -131,7 +131,8 @@ class Statestore : public CacheLineAligned {
 
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
-  /// pair.
+  /// pair. If the byte string is NULL, the entry has been deleted, but may be retained to
+  /// track changes to send to subscribers.
   class TopicEntry {
    public:
     /// A Value is a string of bytes, for which std::string is a convenient representation.
@@ -145,38 +146,30 @@ class Statestore : public CacheLineAligned {
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
 
-    /// Sets the value of this entry to the byte / length pair. The caller is responsible
-    /// for ensuring, if required, that the version parameter is larger than the
-    /// current version() TODO: Consider enforcing version monotonicity here.
-    void SetValue(const Value& bytes, Version version);
-
-    /// Sets a new version for this entry.
-    void SetVersion(Version version) { version_ = version; }
+    /// Representation of an empty Value. Must have size() == 0.
+    static const Value NULL_VALUE;
 
-    /// Sets the is_deleted_ flag for this entry.
-    void SetDeleted(bool is_deleted) { is_deleted_ = is_deleted; }
+    /// Sets the value of this entry to the byte / length pair. NULL_VALUE implies this
+    /// entry has been deleted.  The caller is responsible for ensuring, if required, that
+    /// the version parameter is larger than the current version() TODO: Consider enforcing
+    /// version monotonicity here.
+    void SetValue(const Value& bytes, Version version);
 
-    TopicEntry() : version_(TOPIC_ENTRY_INITIAL_VERSION),
-        is_deleted_(false) { }
+    TopicEntry() : value_(NULL_VALUE), version_(TOPIC_ENTRY_INITIAL_VERSION) { }
 
     const Value& value() const { return value_; }
     uint64_t version() const { return version_; }
     uint32_t length() const { return value_.size(); }
-    bool is_deleted() const { return is_deleted_; }
 
    private:
-    /// Byte string value, owned by this TopicEntry. The value is opaque to the
-    /// statestore, and is interpreted only by subscribers.
+    /// Byte string value, owned by this TopicEntry. The value is opaque to the statestore,
+    /// and is interpreted only by subscribers.
     Value value_;
 
     /// The version of this entry. Every update is assigned a monotonically increasing
     /// version number so that only the minimal set of changes can be sent from the
     /// statestore to a subscriber.
     Version version_;
-
-    /// Indicates if the entry has been deleted. If true, the entry will still be
-    /// retained to track changes to send to subscribers.
-    bool is_deleted_;
   };
 
   /// Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
@@ -199,21 +192,19 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key and value (bytes). If is_deleted is
-    /// true the entry is considered deleted, and may be garbage collected in the future.
-    /// The entry is assigned a new version number by the Topic, and that version number
-    /// is returned.
+    /// Adds an entry with the given key. If bytes == NULL_VALUE, the entry is considered
+    /// deleted, and may be garbage collected in the future. The entry is assigned a new
+    /// version number by the Topic, and that version number is returned.
     //
     /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
-        bool is_deleted);
+    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes);
 
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
     /// when that subscriber fails (the same entry may exist, but may have been updated by
     /// another subscriber giving it a new version number)
     //
-    /// Deletion means marking the entry as deleted and incrementing its version
+    /// Deletion means setting the entry's value to NULL and incrementing its version
     /// number.
     //
     /// Must be called holding the topic lock

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/common/thrift/CatalogInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogInternalService.thrift b/common/thrift/CatalogInternalService.thrift
index 5170298..5b68408 100644
--- a/common/thrift/CatalogInternalService.thrift
+++ b/common/thrift/CatalogInternalService.thrift
@@ -22,25 +22,15 @@ include "CatalogObjects.thrift"
 
 // Contains structures used internally by the Catalog Server.
 
-// Arguments to a GetCatalogDelta call.
-struct TGetCatalogDeltaRequest {
-  // The base catalog version from which the delta is computed.
-  1: required i64 from_version
-}
-
-// Response from a call to GetCatalogDelta. Contains a delta of catalog objects
-// (databases, tables/views, and functions) from the CatalogService's cache relative (>)
-// to the catalog version specified in TGetCatalogDelta.from_version.
-struct TGetCatalogDeltaResponse {
+// Response from a call to GetAllCatalogObjects. Contains all known Catalog objects
+// (databases, tables/views, and functions) from the CatalogService's cache.
+// What metadata is included for each object is based on the parameters used in
+// the request.
+struct TGetAllCatalogObjectsResponse {
   // The maximum catalog version of all objects in this response or 0 if the Catalog
   // contained no objects.
   1: required i64 max_catalog_version
 
-  // List of updated (new and modified) catalog objects for which the catalog verion is
-  // larger than TGetCatalotDeltaRequest.from_version.
-  2: required list<CatalogObjects.TCatalogObject> updated_objects
-
-  // List of deleted catalog objects for which the catalog version is larger than
-  // TGetCatalogDelta.from_version.
-  3: required list<CatalogObjects.TCatalogObject> deleted_objects
+  // List of catalog objects (empty list if no objects detected in the Catalog).
+  2: required list<CatalogObjects.TCatalogObject> objects
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index f04650e..60a0d0d 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -79,15 +79,8 @@ struct TTopicItem {
   1: required string key;
 
   // Byte-string value for this topic entry. May not be null-terminated (in that it may
-  // contain null bytes). It can be non-empty when deleted is true. This is needed when
-  // subscribers need additional information in order to process the deleted topics that
-  // is not included in the topic key (e.g. catalog version of deleted catalog objects).
+  // contain null bytes)
   2: required string value;
-
-  // If true, this item was deleted. When false, this TTopicItem need not be included in
-  // non-delta TTopicDelta's (since the latest version of every still-present topic will
-  // be included).
-  3: required bool deleted = false;
 }
 
 // Set of changes to a single topic, sent from the statestore to a subscriber as well as
@@ -96,14 +89,15 @@ struct TTopicDelta {
   // Name of the topic this delta applies to
   1: required string topic_name;
 
-  // When is_delta=true, a list of changes to topic entries, including deletions, within
-  // [from_version, to_version].
-  // When is_delta=false, this is the list of all non-delete topic entries for
-  // [0, to_version], which can be used to reconstruct the topic from scratch.
+  // List of changes to topic entries
   2: required list<TTopicItem> topic_entries;
 
-  // True if entries / deletions are relative to the topic at versions [0, from_version].
-  3: required bool is_delta;
+  // List of topic item keys whose entries have been deleted
+  3: required list<string> topic_deletions;
+
+  // True if entries / deletions are to be applied to in-memory state,
+  // otherwise topic_entries contains entire topic state.
+  4: required bool is_delta;
 
   // The Topic version range this delta covers. If there have been changes to the topic,
   // the update will include all changes in the range: [from_version, to_version).
@@ -111,17 +105,16 @@ struct TTopicDelta {
   // to_version. The from_version will always be 0 for non-delta updates.
   // If this is an update being sent from a subscriber to the statestore, the from_version
   // is set only when recovering from an inconsistent state, to the last version of the
-  // topic the subscriber successfully processed. The value of to_version doesn't depend
-  // on whether the update is delta or not.
-  4: optional i64 from_version
-  5: optional i64 to_version
+  // topic the subscriber successfully processed.
+  5: optional i64 from_version
+  6: optional i64 to_version
 
   // The minimum topic version of all subscribers to the topic. This can be used to
   // determine when all subscribers have successfully processed a specific update.
   // This is guaranteed because no subscriber will ever be sent a topic containing
   // keys with a version < min_subscriber_topic_version. Only used when sending an update
   // from the statestore to a subscriber.
-  6: optional i64 min_subscriber_topic_version
+  7: optional i64 min_subscriber_topic_version
 }
 
 // Description of a topic to subscribe to as part of a RegisterSubscriber call

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
index c00c460..27839b3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogDeltaLog.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.catalog;
 
-import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -25,37 +24,24 @@ import java.util.TreeMap;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TTable;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 /**
- * Represents a log of deleted catalog objects.
+ * The impalad catalog cache can be modified by either a state store update or by a
+ * direct ("fast") update that applies the result of a catalog operation to the cache
+ * out-of-band of a state store update. This thread safe log tracks the divergence
+ * (due to direct updates to the cache) of this impalad's cache from the last state
+ * store update. This log is needed to ensure work is never undone. For example,
+ * consider the following sequence of events:
+ * t1: [Direct Update] - Add item A - (Catalog Version 9)
+ * t2: [Direct Update] - Drop item A - (Catalog Version 10)
+ * t3: [StateStore Update] - (From Catalog Version 9)
+ * This log is used to ensure the state store update in t3 does not undo the drop in t2.
  *
- * There are currently two use cases for this log:
- *
- * a) Processing catalog updates in the impalads
- *   The impalad catalog cache can be modified by either a state store update or by a
- *   direct ("fast") update that applies the result of a catalog operation to the cache
- *   out-of-band of a state store update. This thread safe log tracks the divergence
- *   (due to direct updates to the cache) of this impalad's cache from the last state
- *   store update. This log is needed to ensure work is never undone. For example,
- *   consider the following sequence of events:
- *   t1: [Direct Update] - Add item A - (Catalog Version 9)
- *   t2: [Direct Update] - Drop item A - (Catalog Version 10)
- *   t3: [StateStore Update] - (From Catalog Version 9)
- *   This log is used to ensure the state store update in t3 does not undo the drop in t2.
- *   Currently this only tracks objects that were dropped, since the catalog cache can be
- *   queried to check if an object was added. TODO: Also track object additions from async
- *   operations. This could be used to to "replay" the log in the case of a catalog reset
- *   ("invalidate metadata"). Currently, the catalog may briefly go back in time if
- *   "invalidate metadata" is run concurrently with async catalog operations.
- *
- * b) Building catalog topic updates in the catalogd
- *   The catalogd uses this log to identify deleted catalog objects. Deleted
- *   catalog objects are added to this log by the corresponding operations that delete
- *   them (e.g. dropTable()). While constructing a catalog update topic, we use the log to
- *   determine which catalog objects were deleted since the last catalog topic update.
- *   Once the catalog topic update is constructed, the old deleted catalog objects are
- *   garbage collected to prevent the log from growing indefinitely.
+ * Currently this only tracks objects that were dropped, since the catalog cache can be
+ * queried to check if an object was added. TODO: Also track object additions from async
+ * operations. This could be used to to "replay" the log in the case of a catalog reset
+ * ("invalidate metadata"). Currently, the catalog may briefly go back in time if
+ * "invalidate metadata" is run concurrently with async catalog operations.
  */
 public class CatalogDeltaLog {
   // Map of the catalog version an object was removed from the catalog
@@ -72,15 +58,6 @@ public class CatalogDeltaLog {
   }
 
   /**
-   * Retrieve all the removed catalog objects with version > 'fromVersion'.
-   */
-  public synchronized List<TCatalogObject> retrieveObjects(long fromVersion) {
-    SortedMap<Long, TCatalogObject> objects =
-        removedCatalogObjects_.tailMap(fromVersion + 1);
-    return ImmutableList.<TCatalogObject>copyOf(objects.values());
-  }
-
-  /**
    * Given the current catalog version, removes all items with catalogVersion <
    * currectCatalogVersion. Such objects do not need to be tracked in the delta
    * log anymore because they are consistent with the state store's view of the
@@ -114,45 +91,30 @@ public class CatalogDeltaLog {
   }
 
   /**
-   * Returns true if the two objects have the same object type and key (generated using
-   * toCatalogObjectKey()).
-   * TODO: Use global object IDs everywhere instead of tracking catalog objects by
-   * generated keys.
-   */
-  private static boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
-    return toCatalogObjectKey(first).equals(toCatalogObjectKey(second));
-  }
-
-  /**
-   * Returns a unique string key of a catalog object.
+   * Returns true if the two objects have the same object type and name.
+   * TODO: Use global object IDs everywhere instead of tracking catalog objects by name.
    */
-  public static String toCatalogObjectKey(TCatalogObject catalogObject)
-      throws IllegalStateException {
-    switch (catalogObject.getType()) {
+  private boolean objectNamesMatch(TCatalogObject first, TCatalogObject second) {
+    if (first.getType() != second.getType()) return false;
+    switch (first.getType()) {
       case DATABASE:
-        return "DATABASE:" + catalogObject.getDb().getDb_name().toLowerCase();
+        return first.getDb().getDb_name().equalsIgnoreCase(second.getDb().getDb_name());
       case TABLE:
       case VIEW:
-        TTable tbl = catalogObject.getTable();
-        return "TABLE:" + tbl.getDb_name().toLowerCase() + "." +
-            tbl.getTbl_name().toLowerCase();
+        TTable firstTbl = first.getTable();
+        return firstTbl.getDb_name().equalsIgnoreCase(second.getTable().getDb_name()) &&
+            firstTbl.getTbl_name().equalsIgnoreCase(second.getTable().getTbl_name());
       case FUNCTION:
-        return "FUNCTION:" + catalogObject.getFn().getName() + "(" +
-            catalogObject.getFn().getSignature() + ")";
+        return first.getFn().getSignature().equals(second.getFn().getSignature()) &&
+            first.getFn().getName().equals(second.getFn().getName());
       case ROLE:
-        return "ROLE:" + catalogObject.getRole().getRole_name().toLowerCase();
+        return first.getRole().getRole_name().equalsIgnoreCase(
+            second.getRole().getRole_name());
       case PRIVILEGE:
-        return "PRIVILEGE:" +
-            catalogObject.getPrivilege().getPrivilege_name().toLowerCase() + "." +
-            Integer.toString(catalogObject.getPrivilege().getRole_id());
-      case HDFS_CACHE_POOL:
-        return "HDFS_CACHE_POOL:" +
-            catalogObject.getCache_pool().getPool_name().toLowerCase();
-      case DATA_SOURCE:
-        return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
-      default:
-        throw new IllegalStateException(
-            "Unsupported catalog object type: " + catalogObject.getType());
+        return first.getPrivilege().getPrivilege_name().equalsIgnoreCase(
+            second.getPrivilege().getPrivilege_name()) &&
+            first.getPrivilege().getRole_id() == second.getPrivilege().getRole_id();
+      default: return false;
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index a4f8608..d2a0a82 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -58,7 +59,7 @@ import org.apache.impala.thrift.TCatalog;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
@@ -149,9 +150,6 @@ public class CatalogServiceCatalog extends Catalog {
   // Local temporary directory to copy UDF Jars.
   private static String localLibraryPath_;
 
-  // Log of deleted catalog objects.
-  private final CatalogDeltaLog deleteLog_;
-
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -182,7 +180,6 @@ public class CatalogServiceCatalog extends Catalog {
       sentryProxy_ = null;
     }
     localLibraryPath_ = new String("file://" + localLibraryPath);
-    deleteLog_ = new CatalogDeltaLog();
   }
 
   // Timeout for acquiring a table lock
@@ -269,15 +266,8 @@ public class CatalogServiceCatalog extends Catalog {
         }
         // Remove dropped cache pools.
         for (String cachePoolName: droppedCachePoolNames) {
-          HdfsCachePool cachePool = hdfsCachePools_.remove(cachePoolName);
-          if (cachePool != null) {
-            cachePool.setCatalogVersion(incrementAndGetCatalogVersion());
-            TCatalogObject removedObject =
-                new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-                    cachePool.getCatalogVersion());
-            removedObject.setCache_pool(cachePool.toThrift());
-            deleteLog_.addRemovedObject(removedObject);
-          }
+          hdfsCachePools_.remove(cachePoolName);
+          CatalogServiceCatalog.this.incrementAndGetCatalogVersion();
         }
       } finally {
         catalogLock_.writeLock().unlock();
@@ -307,140 +297,117 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Computes and returns a delta of catalog objects relative to 'fromVersion'. Takes a
-   * lock on the catalog to ensure this update contains a consistent snapshot of the
-   * catalog.
+   * Returns all known objects in the Catalog (Tables, Views, Databases, and
+   * Functions). Some metadata may be skipped for objects that have a catalog
+   * version < the specified "fromVersion". Takes a lock on the catalog to ensure this
+   * update contains a consistent snapshot of all items in the catalog. While holding the
+   * catalog lock, it locks each accessed table to protect against concurrent
+   * modifications.
    */
-  public TGetCatalogDeltaResponse getCatalogDelta(long fromVersion) {
+  public TGetAllCatalogObjectsResponse getCatalogObjects(long fromVersion) {
+    TGetAllCatalogObjectsResponse resp = new TGetAllCatalogObjectsResponse();
+    resp.setObjects(new ArrayList<TCatalogObject>());
+    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
     catalogLock_.readLock().lock();
     try {
-      TGetCatalogDeltaResponse resp = getCatalogObjects(fromVersion);
-      // Each update should contain a single "TCatalog" object which is used to
-      // pass overall state on the catalog, such as the current version and the
-      // catalog service id.
-      TCatalogObject catalog = new TCatalogObject();
-      catalog.setType(TCatalogObjectType.CATALOG);
-      // By setting the catalog version to the latest catalog version at this point,
-      // it ensure impalads will always bump their versions, even in the case where
-      // an object has been dropped.
-      long currentCatalogVersion = getCatalogVersion();
-      catalog.setCatalog_version(currentCatalogVersion);
-      catalog.setCatalog(new TCatalog(catalogServiceId_));
-      resp.addToUpdated_objects(catalog);
-
-      // The max version is the max catalog version of all items in the update.
-      resp.setMax_catalog_version(currentCatalogVersion);
-      deleteLog_.garbageCollect(currentCatalogVersion);
-      return resp;
-    } finally {
-      catalogLock_.readLock().unlock();
-    }
-  }
-
-  /**
-   * Identify and return the catalog objects that were added/modified/deleted in the
-   * catalog with versions > 'fromVersion'. The caller of this function must hold the
-   * catalog read lock to prevent concurrent modifications of the catalog.
-   */
-  private TGetCatalogDeltaResponse getCatalogObjects(long fromVersion) {
-    TGetCatalogDeltaResponse resp = new TGetCatalogDeltaResponse();
-    resp.setUpdated_objects(new ArrayList<TCatalogObject>());
-    resp.setDeleted_objects(new ArrayList<TCatalogObject>());
-    resp.setMax_catalog_version(Catalog.INITIAL_CATALOG_VERSION);
-
-    // process databases
-    for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
-      if (db.getCatalogVersion() > fromVersion) {
+      for (Db db: getDbs(PatternMatcher.MATCHER_MATCH_ALL)) {
         TCatalogObject catalogDb = new TCatalogObject(TCatalogObjectType.DATABASE,
             db.getCatalogVersion());
         catalogDb.setDb(db.toThrift());
-        resp.addToUpdated_objects(catalogDb);
-      }
-      // process tables
-      for (Table tbl: db.getTables()) {
-        TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
-            Catalog.INITIAL_CATALOG_VERSION);
-        // Protect the table from concurrent modifications.
-        tbl.getLock().lock();
-        try {
-          // Only add the extended metadata if this table's version is > fromVersion.
-          if (tbl.getCatalogVersion() > fromVersion) {
-            try {
-              catalogTbl.setTable(tbl.toThrift());
-            } catch (Exception e) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(String.format("Error calling toThrift() on table %s: %s",
-                    tbl.getFullName(), e.getMessage()), e);
+        resp.addToObjects(catalogDb);
+
+        for (String tblName: db.getAllTableNames()) {
+          TCatalogObject catalogTbl = new TCatalogObject(TCatalogObjectType.TABLE,
+              Catalog.INITIAL_CATALOG_VERSION);
+
+          Table tbl = db.getTable(tblName);
+          if (tbl == null) {
+            LOG.error("Table: " + tblName + " was expected to be in the catalog " +
+                "cache. Skipping table for this update.");
+            continue;
+          }
+
+          // Protect the table from concurrent modifications.
+          tbl.getLock().lock();
+          try {
+            // Only add the extended metadata if this table's version is >=
+            // the fromVersion.
+            if (tbl.getCatalogVersion() >= fromVersion) {
+              try {
+                catalogTbl.setTable(tbl.toThrift());
+              } catch (Exception e) {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace(String.format("Error calling toThrift() on table %s.%s: %s",
+                      db.getName(), tblName, e.getMessage()), e);
+                }
+                continue;
               }
-              continue;
+              catalogTbl.setCatalog_version(tbl.getCatalogVersion());
+            } else {
+              catalogTbl.setTable(new TTable(db.getName(), tblName));
             }
-            catalogTbl.setCatalog_version(tbl.getCatalogVersion());
-            resp.addToUpdated_objects(catalogTbl);
+          } finally {
+            tbl.getLock().unlock();
           }
-        } finally {
-          tbl.getLock().unlock();
+          resp.addToObjects(catalogTbl);
+        }
+
+        for (Function fn: db.getFunctions(null, new PatternMatcher())) {
+          TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
+              fn.getCatalogVersion());
+          function.setFn(fn.toThrift());
+          resp.addToObjects(function);
         }
       }
-      // process functions
-      for (Function fn: db.getFunctions(null, new PatternMatcher())) {
-        if (fn.getCatalogVersion() <= fromVersion) continue;
-        TCatalogObject function = new TCatalogObject(TCatalogObjectType.FUNCTION,
-            fn.getCatalogVersion());
-        function.setFn(fn.toThrift());
-        resp.addToUpdated_objects(function);
+
+      for (DataSource dataSource: getDataSources()) {
+        TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
+            dataSource.getCatalogVersion());
+        catalogObj.setData_source(dataSource.toThrift());
+        resp.addToObjects(catalogObj);
       }
-    }
-    // process data sources
-    for (DataSource dataSource: getDataSources()) {
-      if (dataSource.getCatalogVersion() <= fromVersion) continue;
-      TCatalogObject catalogObj = new TCatalogObject(TCatalogObjectType.DATA_SOURCE,
-          dataSource.getCatalogVersion());
-      catalogObj.setData_source(dataSource.toThrift());
-      resp.addToUpdated_objects(catalogObj);
-    }
-    // process cache pools
-    for (HdfsCachePool cachePool: hdfsCachePools_) {
-      if (cachePool.getCatalogVersion() <= fromVersion) continue;
-      TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
-          cachePool.getCatalogVersion());
-      pool.setCache_pool(cachePool.toThrift());
-      resp.addToUpdated_objects(pool);
-    }
-    // process roles and privileges
-    for (Role role: authPolicy_.getAllRoles()) {
-      if (role.getCatalogVersion() > fromVersion) {
+      for (HdfsCachePool cachePool: hdfsCachePools_) {
+        TCatalogObject pool = new TCatalogObject(TCatalogObjectType.HDFS_CACHE_POOL,
+            cachePool.getCatalogVersion());
+        pool.setCache_pool(cachePool.toThrift());
+        resp.addToObjects(pool);
+      }
+
+      // Get all roles
+      for (Role role: authPolicy_.getAllRoles()) {
         TCatalogObject thriftRole = new TCatalogObject();
         thriftRole.setRole(role.toThrift());
         thriftRole.setCatalog_version(role.getCatalogVersion());
         thriftRole.setType(role.getCatalogObjectType());
-        resp.addToUpdated_objects(thriftRole);
-      }
-
-      for (RolePrivilege p: role.getPrivileges()) {
-        if (p.getCatalogVersion() <= fromVersion) continue;
-        TCatalogObject privilege = new TCatalogObject();
-        privilege.setPrivilege(p.toThrift());
-        privilege.setCatalog_version(p.getCatalogVersion());
-        privilege.setType(p.getCatalogObjectType());
-        resp.addToUpdated_objects(privilege);
+        resp.addToObjects(thriftRole);
+
+        for (RolePrivilege p: role.getPrivileges()) {
+          TCatalogObject privilege = new TCatalogObject();
+          privilege.setPrivilege(p.toThrift());
+          privilege.setCatalog_version(p.getCatalogVersion());
+          privilege.setType(p.getCatalogObjectType());
+          resp.addToObjects(privilege);
+        }
       }
-    }
 
-    Set<String> updatedCatalogObjects = Sets.newHashSet();
-    for (TCatalogObject catalogObj: resp.updated_objects) {
-      updatedCatalogObjects.add(CatalogDeltaLog.toCatalogObjectKey(catalogObj));
-    }
+      // Each update should contain a single "TCatalog" object which is used to
+      // pass overall state on the catalog, such as the current version and the
+      // catalog service id.
+      TCatalogObject catalog = new TCatalogObject();
+      catalog.setType(TCatalogObjectType.CATALOG);
+      // By setting the catalog version to the latest catalog version at this point,
+      // it ensure impalads will always bump their versions, even in the case where
+      // an object has been dropped.
+      catalog.setCatalog_version(getCatalogVersion());
+      catalog.setCatalog(new TCatalog(catalogServiceId_));
+      resp.addToObjects(catalog);
 
-    // Identify the catalog objects that were removed from the catalog for which the
-    // version is > 'fromVersion'. We need to make sure that we don't include "deleted"
-    // objects that were re-added to the catalog.
-    for (TCatalogObject removedObject: deleteLog_.retrieveObjects(fromVersion)) {
-      if (!updatedCatalogObjects.contains(CatalogDeltaLog.toCatalogObjectKey(
-          removedObject))) {
-        resp.addToDeleted_objects(removedObject);
-      }
+      // The max version is the max catalog version of all items in the update.
+      resp.setMax_catalog_version(getCatalogVersion());
+      return resp;
+    } finally {
+      catalogLock_.readLock().unlock();
     }
-    return resp;
   }
 
   /**
@@ -743,40 +710,6 @@ public class CatalogServiceCatalog extends Catalog {
           tblsToBackgroundLoad.add(new TTableName(dbName, tableName.toLowerCase()));
         }
       }
-
-      if (existingDb != null) {
-        // Identify any removed functions and add them to the delta log.
-        for (Map.Entry<String, List<Function>> e:
-             existingDb.getAllFunctions().entrySet()) {
-          for (Function fn: e.getValue()) {
-            if (newDb.getFunction(fn,
-                Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
-              fn.setCatalogVersion(incrementAndGetCatalogVersion());
-              TCatalogObject removedObject =
-                  new TCatalogObject(TCatalogObjectType.FUNCTION, fn.getCatalogVersion());
-              removedObject.setFn(fn.toThrift());
-              deleteLog_.addRemovedObject(removedObject);
-            }
-          }
-        }
-
-        // Identify any deleted tables and add them to the delta log
-        Set<String> oldTableNames = Sets.newHashSet(existingDb.getAllTableNames());
-        Set<String> newTableNames = Sets.newHashSet(newDb.getAllTableNames());
-        oldTableNames.removeAll(newTableNames);
-        for (String removedTableName: oldTableNames) {
-          Table removedTable = IncompleteTable.createUninitializedTable(existingDb,
-              removedTableName);
-          removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
-          TCatalogObject removedObject =
-              new TCatalogObject(TCatalogObjectType.TABLE,
-                  removedTable.getCatalogVersion());
-          removedObject.setTable(new TTable());
-          removedObject.getTable().setDb_name(existingDb.getName());
-          removedObject.getTable().setTbl_name(removedTableName);
-          deleteLog_.addRemovedObject(removedObject);
-        }
-      }
       return Pair.create(newDb, tblsToBackgroundLoad);
     } catch (Exception e) {
       LOG.warn("Encountered an exception while invalidating database: " + dbName +
@@ -824,22 +757,6 @@ public class CatalogServiceCatalog extends Catalog {
         }
       }
       dbCache_.set(newDbCache);
-
-      // Identify any deleted databases and add them to the delta log.
-      Set<String> oldDbNames = oldDbCache.keySet();
-      Set<String> newDbNames = newDbCache.keySet();
-      oldDbNames.removeAll(newDbNames);
-      for (String dbName: oldDbNames) {
-        Db removedDb = oldDbCache.get(dbName);
-        Preconditions.checkNotNull(removedDb);
-        removedDb.setCatalogVersion(
-            CatalogServiceCatalog.this.incrementAndGetCatalogVersion());
-        TCatalogObject removedObject = new TCatalogObject(TCatalogObjectType.DATABASE,
-            removedDb.getCatalogVersion());
-        removedObject.setDb(removedDb.toThrift());
-        deleteLog_.addRemovedObject(removedObject);
-      }
-
       // Submit tables for background loading.
       for (TTableName tblName: tblsToBackgroundLoad) {
         tableLoadingMgr_.backgroundLoad(tblName);
@@ -1442,6 +1359,4 @@ public class CatalogServiceCatalog extends Catalog {
       tbl.getLock().unlock();
     }
   }
-
-  public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 70c9a61..4c959b2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -177,7 +177,7 @@ public class ImpaladCatalog extends Catalog {
     // its child tables/functions is fine. If that happens, the removal of the child
     // object will be a no-op.
     for (TCatalogObject catalogObject: req.getRemoved_objects()) {
-      removeCatalogObject(catalogObject);
+      removeCatalogObject(catalogObject, newCatalogVersion);
     }
     lastSyncedCatalogVersion_ = newCatalogVersion;
     // Cleanup old entries in the log.
@@ -319,10 +319,24 @@ public class ImpaladCatalog extends Catalog {
   /**
    *  Removes the matching TCatalogObject from the catalog, if one exists and its
    *  catalog version is < the catalog version of this drop operation.
+   *  Note that drop operations that come from statestore heartbeats always have a
+   *  version of 0. To determine the drop version for statestore updates,
+   *  the catalog version from the current update is used. This is okay because there
+   *  can never be a catalog update from the statestore that contains a drop
+   *  and an addition of the same object. For more details on how drop
+   *  versioning works, see CatalogServerCatalog.java
    */
-  private void removeCatalogObject(TCatalogObject catalogObject) {
-    Preconditions.checkState(catalogObject.getCatalog_version() != 0);
-    long dropCatalogVersion = catalogObject.getCatalog_version();
+  private void removeCatalogObject(TCatalogObject catalogObject,
+      long currentCatalogUpdateVersion) {
+    // The TCatalogObject associated with a drop operation from a state store
+    // heartbeat will always have a version of zero. Because no update from
+    // the state store can contain both a drop and an addition of the same object,
+    // we can assume the drop version is the current catalog version of this update.
+    // If the TCatalogObject contains a version that != 0, it indicates the drop
+    // came from a direct update.
+    long dropCatalogVersion = catalogObject.getCatalog_version() == 0 ?
+        currentCatalogUpdateVersion : catalogObject.getCatalog_version();
+
     switch(catalogObject.getType()) {
       case DATABASE:
         removeDb(catalogObject.getDb(), dropCatalogVersion);

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1683cc0..dcec430 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1091,7 +1091,6 @@ public class CatalogOpExecutor {
     removedObject.setCatalog_version(dataSource.getCatalogVersion());
     resp.result.addToRemoved_catalog_objects(removedObject);
     resp.result.setVersion(dataSource.getCatalogVersion());
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
   }
 
   /**
@@ -1274,7 +1273,6 @@ public class CatalogOpExecutor {
     removedObject.getDb().setDb_name(params.getDb());
     resp.result.setVersion(removedObject.getCatalog_version());
     resp.result.addToRemoved_catalog_objects(removedObject);
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
   }
 
   /**
@@ -1396,7 +1394,6 @@ public class CatalogOpExecutor {
     removedObject.getTable().setTbl_name(tableName.getTbl());
     removedObject.getTable().setDb_name(tableName.getDb());
     removedObject.setCatalog_version(resp.result.getVersion());
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
     resp.result.addToRemoved_catalog_objects(removedObject);
   }
 
@@ -1524,9 +1521,6 @@ public class CatalogOpExecutor {
 
       if (!removedFunctions.isEmpty()) {
         resp.result.setRemoved_catalog_objects(removedFunctions);
-        for (TCatalogObject removedFnObject: removedFunctions) {
-          catalog_.getDeleteLog().addRemovedObject(removedFnObject);
-        }
       }
       resp.result.setVersion(catalog_.getCatalogVersion());
     }
@@ -2223,7 +2217,6 @@ public class CatalogOpExecutor {
     removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
     removedObject.setCatalog_version(addedObject.getCatalog_version());
     response.result.addToRemoved_catalog_objects(removedObject);
-    catalog_.getDeleteLog().addRemovedObject(removedObject);
     response.result.addToUpdated_catalog_objects(addedObject);
     response.result.setVersion(addedObject.getCatalog_version());
   }
@@ -2816,7 +2809,6 @@ public class CatalogOpExecutor {
     catalogObject.setCatalog_version(role.getCatalogVersion());
     if (createDropRoleParams.isIs_drop()) {
       resp.result.addToRemoved_catalog_objects(catalogObject);
-      catalog_.getDeleteLog().addRemovedObject(catalogObject);
     } else {
       resp.result.addToUpdated_catalog_objects(catalogObject);
     }
@@ -2879,9 +2871,6 @@ public class CatalogOpExecutor {
       catalogObject.setPrivilege(rolePriv.toThrift());
       catalogObject.setCatalog_version(rolePriv.getCatalogVersion());
       updatedPrivs.add(catalogObject);
-      if (!grantRevokePrivParams.isIs_grant() && !privileges.get(0).isHas_grant_opt()) {
-        catalog_.getDeleteLog().addRemovedObject(catalogObject);
-      }
     }
 
     // TODO: Currently we only support sending back 1 catalog object in a "direct DDL"
@@ -3053,9 +3042,6 @@ public class CatalogOpExecutor {
           resp.result.setUpdated_catalog_objects(addedFuncs);
           resp.result.setRemoved_catalog_objects(removedFuncs);
           resp.result.setVersion(catalog_.getCatalogVersion());
-          for (TCatalogObject removedFn: removedFuncs) {
-            catalog_.getDeleteLog().addRemovedObject(removedFn);
-          }
         }
       }
     } else if (req.isSetTable_name()) {
@@ -3093,7 +3079,6 @@ public class CatalogOpExecutor {
         // processed as a direct DDL operation.
         if (tblWasRemoved.getRef()) {
           resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
-          catalog_.getDeleteLog().addRemovedObject(updatedThriftTable);
         } else {
           resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
         }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e945a3b..b56527b 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -38,8 +38,7 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TFunction;
-import org.apache.impala.thrift.TGetCatalogDeltaResponse;
-import org.apache.impala.thrift.TGetCatalogDeltaRequest;
+import org.apache.impala.thrift.TGetAllCatalogObjectsResponse;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -120,11 +119,9 @@ public class JniCatalog {
   /**
    * Gets all catalog objects
    */
-  public byte[] getCatalogDelta(byte[] thriftGetCatalogDeltaReq)
-      throws ImpalaException, TException {
-    TGetCatalogDeltaRequest params = new TGetCatalogDeltaRequest();
-    JniUtil.deserializeThrift(protocolFactory_, params, thriftGetCatalogDeltaReq);
-    TGetCatalogDeltaResponse resp = catalog_.getCatalogDelta(params.getFrom_version());
+  public byte[] getCatalogObjects(long from_version) throws ImpalaException, TException {
+    TGetAllCatalogObjectsResponse resp =
+        catalog_.getCatalogObjects(from_version);
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(resp);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/a88c3b9c/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 1003dc7..e2b1715 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -311,12 +311,14 @@ class StatestoreSubscriber(object):
 
 class TestStatestore():
   def make_topic_update(self, topic_name, key_template="foo", value_template="bar",
-                        num_updates=1):
+                        num_updates=1, deletions=None):
     topic_entries = [
       Subscriber.TTopicItem(key=key_template + str(x), value=value_template + str(x))
       for x in xrange(num_updates)]
+    if deletions is None: deletions = []
     return Subscriber.TTopicDelta(topic_name=topic_name,
                                   topic_entries=topic_entries,
+                                  topic_deletions=deletions,
                                   is_delta=False)
 
   def test_registration_ids_different(self):
@@ -347,9 +349,11 @@ class TestStatestore():
         assert len(args.topic_deltas) == 1
         assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
+        assert args.topic_deltas[topic_name].topic_deletions == delta.topic_deletions
       elif sub.update_count == 3:
         # After the content-bearing update was processed, the next delta should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
+        assert len(args.topic_deltas[topic_name].topic_deletions) == 0
 
       return DEFAULT_UPDATE_STATE_RESPONSE
 
@@ -457,7 +461,7 @@ class TestStatestore():
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, see
         # IMPALA-1891
-        assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
+        assert len(args.topic_deltas[transient_topic_name].topic_deletions) == 0
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),


[5/7] impala git commit: IMPALA-1144: Fix exception when cancelling query in Impala-shell with CTRL-C

Posted by ta...@apache.org.
IMPALA-1144: Fix exception when cancelling query in Impala-shell with CTRL-C

Issue 1: When query is cancelled via CTRL-C while being executed in Impala-shell
then an exception is thrown from Impala backend saying 'Invalid query handle'.
This is because one ImpalaClient was making RPC's while another ImpalaClient
cancelled the query on the backend. As a result RPC handlers in ImpalaServer
try to access a ClientRequestState that had been cleared from the backend. The
issue is confidently reproducable both in wait_to_finish and in fetch states of
the query.

As a solution the query cancellation is indicated to ImpalaClient via a bool
flag. Once a cancellation originated exception reaches Impala shell this flag
is checked to decide whether to suppress the error or not.

Issue 2: Every time a query was cancelled a 'use db' command was issued
automatically. This happened to historical reasons but is not needed anymore
(see Jira for more details).

Change-Id: I6cefaf1dae78baae238289816a7cb9d210fb38e2
Reviewed-on: http://gerrit.cloudera.org:8080/8549
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6d9da172
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6d9da172
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6d9da172

Branch: refs/heads/master
Commit: 6d9da172889cde75e041caf4fa024f4d9f223db5
Parents: dc1282f
Author: Gabor Kaszab <ga...@cloudera.com>
Authored: Wed Nov 15 01:01:45 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 29 03:44:51 2017 +0000

----------------------------------------------------------------------
 shell/impala_client.py                | 25 +++++++++++++++++++++----
 shell/impala_shell.py                 |  7 +++++--
 tests/shell/test_shell_commandline.py | 30 ++++++++++++++++++++++++++++++
 3 files changed, 56 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 868d898..795768c 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -55,6 +55,8 @@ class DisconnectedException(Exception):
   def __str__(self):
       return self.value
 
+class QueryCancelledByShellException(Exception): pass
+
 class ImpalaClient(object):
 
   def __init__(self, impalad, use_kerberos=False, kerberos_service_name="impala",
@@ -74,6 +76,10 @@ class ImpalaClient(object):
     self.query_option_levels = {}
     self.query_state = QueryState._NAMES_TO_VALUES
     self.fetch_batch_size = 1024
+    # This is set from ImpalaShell's signal handler when a query is cancelled
+    # from command line via CTRL+C. It is used to suppress error messages of
+    # query cancellation.
+    self.is_query_cancelled = False
 
   def _options_to_string_list(self, set_query_options):
     return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()]
@@ -306,6 +312,7 @@ class ImpalaClient(object):
     return query
 
   def execute_query(self, query):
+    self.is_query_cancelled = False
     rpc_result = self._do_rpc(lambda: self.imp_service.query(query))
     last_query_handle, status = rpc_result
     if status != RpcStatus.OK:
@@ -381,7 +388,8 @@ class ImpalaClient(object):
     # co-ordinator, so we don't need to wait.
     if query_handle_closed:
       return True
-    rpc_result = self._do_rpc(lambda: self.imp_service.Cancel(last_query_handle))
+    rpc_result = self._do_rpc(lambda: self.imp_service.Cancel(last_query_handle),
+        False)
     _, status = rpc_result
     return status == RpcStatus.OK
 
@@ -409,7 +417,7 @@ class ImpalaClient(object):
       return summary
     return None
 
-  def _do_rpc(self, rpc):
+  def _do_rpc(self, rpc, suppress_error_on_cancel=True):
     """Executes the provided callable."""
 
     if not self.connected:
@@ -428,16 +436,25 @@ class ImpalaClient(object):
           status = RpcStatus.ERROR
       return ret, status
     except BeeswaxService.QueryNotFoundException:
+      if suppress_error_on_cancel and self.is_query_cancelled:
+        raise QueryCancelledByShellException()
       raise QueryStateException('Error: Stale query handle')
     # beeswaxException prints out the entire object, printing
     # just the message is far more readable/helpful.
     except BeeswaxService.BeeswaxException, b:
-        raise RPCException("ERROR: %s" % b.message)
+      # Suppress the errors from cancelling a query that is in fetch state
+      if suppress_error_on_cancel and self.is_query_cancelled:
+        raise QueryCancelledByShellException()
+      raise RPCException("ERROR: %s" % b.message)
     except TTransportException, e:
       # issue with the connection with the impalad
       raise DisconnectedException("Error communicating with impalad: %s" % e)
     except TApplicationException, t:
-        raise RPCException("Application Exception : %s" % t)
+      # Suppress the errors from cancelling a query that is in waiting_to_finish
+      # state
+      if suppress_error_on_cancel and self.is_query_cancelled:
+        raise QueryCancelledByShellException()
+      raise RPCException("Application Exception : %s" % t)
     return None, RpcStatus.ERROR
 
   def _get_sleep_interval(self, start_time):

http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index a9a527a..ffe01e1 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -36,7 +36,8 @@ import textwrap
 import time
 
 from impala_client import (ImpalaClient, DisconnectedException, QueryStateException,
-                           RPCException, TApplicationException)
+                           RPCException, TApplicationException,
+                           QueryCancelledByShellException)
 from impala_shell_config_defaults import impala_shell_defaults
 from option_parser import get_option_parser, get_config_from_file
 from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter
@@ -484,13 +485,13 @@ class ImpalaShell(object, cmd.Cmd):
     # Create a new connection to the impalad and cancel the query.
     for cancel_try in xrange(ImpalaShell.CANCELLATION_TRIES):
       try:
+        self.imp_client.is_query_cancelled = True
         self.query_handle_closed = True
         print_to_stderr(ImpalaShell.CANCELLATION_MESSAGE)
         new_imp_client = self._new_impala_client()
         new_imp_client.connect()
         new_imp_client.cancel_query(self.last_query_handle, False)
         self.imp_client.close_query(self.last_query_handle)
-        self._validate_database()
         break
       except Exception, e:
         # Suppress harmless errors.
@@ -1038,6 +1039,8 @@ class ImpalaShell(object, cmd.Cmd):
       except RPCException, e:
         if self.show_profiles: raise e
       return CmdStatus.SUCCESS
+    except QueryCancelledByShellException, e:
+      return CmdStatus.SUCCESS
     except RPCException, e:
       # could not complete the rpc successfully
       print_to_stderr(e)

http://git-wip-us.apache.org/repos/asf/impala/blob/6d9da172/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index 218224b..1ecdbd5 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -325,6 +325,36 @@ class TestImpalaShell(ImpalaTestSuite):
 
     assert "Cancelling Query" in result.stderr, result.stderr
 
+  def test_query_cancellation_during_fetch(self):
+    """IMPALA-1144: Test cancellation (CTRL+C) while results are being
+    fetched"""
+    # A select query where fetch takes several seconds
+    args = '-q "with v as (values (1 as x), (2), (3), (4)) ' + \
+        'select * from v, v v2, v v3, v v4, v v5, v v6, v v7, v v8, ' + \
+        'v v9, v v10, v v11;"'
+    # Kill happens when the results are being fetched
+    self.run_and_verify_query_cancellation_test(args)
+
+  def test_query_cancellation_during_wait_to_finish(self):
+    """IMPALA-1144: Test cancellation (CTRL+C) while the query is in the
+    wait_to_finish state"""
+    # A select where wait_to_finish takes several seconds
+    args = '-q "select * from tpch.customer c1, tpch.customer c2, ' + \
+           'tpch.customer c3 order by c1.c_name"'
+    # Kill happens in wait_to_finish state
+    self.run_and_verify_query_cancellation_test(args)
+
+  def run_and_verify_query_cancellation_test(self, args):
+    """Starts the execution of the received query, waits until the query
+    execution in fact starts and then cancels it. Expects the query
+    cancellation to succeed."""
+    p = ImpalaShell(args)
+    sleep(2.0)
+    os.kill(p.pid(), signal.SIGINT)
+    result = p.get_result()
+    assert "Cancelling Query" in result.stderr
+    assert "Invalid query handle" not in result.stderr
+
   def test_get_log_once(self, empty_table):
     """Test that get_log() is always called exactly once."""
     # Query with fetch


[7/7] impala git commit: IMPALA-6187: Fix missing conjuncts evaluation with empty projection

Posted by ta...@apache.org.
IMPALA-6187: Fix missing conjuncts evaluation with empty projection

Previously, scanners will assume that there are no conjuncts associated
with a scan node for queries with no materialized slots (e.g. count(*)).
This is not necessarily the case as one can write queries such as
select count(*) from tpch.lineitem where rand() * 10 < 0; or
select count(*) from tpch.lineitem where rand() > <a partition column>.
In which case, the conjuncts should still be evaluated once per row.

This change fixes the problem in the short-circuit handling logic for
count(*) to evaluate the conjuncts once per row and only commits a row
to the output row batch if the conjuncts evaluate to true.

Testing done: Added the example above to the scanner test

Change-Id: Ib530f1fdcd2c6de699977db163b3f6eb38481517
Reviewed-on: http://gerrit.cloudera.org:8080/8623
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/63f17e9c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/63f17e9c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/63f17e9c

Branch: refs/heads/master
Commit: 63f17e9ceaed92a28ea12567a36b746e54fffdb3
Parents: 2fba80e
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Nov 20 19:35:06 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 29 05:53:15 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |  2 +-
 be/src/exec/hdfs-scanner.cc                     | 25 +++++++++++++++-----
 be/src/exec/hdfs-scanner.h                      |  8 +++++--
 be/src/exec/hdfs-sequence-scanner.cc            |  2 +-
 be/src/exec/kudu-scanner.cc                     | 13 +++++++++-
 be/src/exec/kudu-scanner.h                      |  2 ++
 .../queries/QueryTest/kudu-scan-node.test       |  9 +++++++
 .../queries/QueryTest/scanners.test             | 24 +++++++++++++++++++
 8 files changed, 74 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index f407877..4e4abef 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -461,7 +461,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     Status status = CommitRows(row_batch, num_to_commit);
     assemble_rows_timer_.Stop();
     RETURN_IF_ERROR(status);
-    row_group_rows_read_ += num_to_commit;
+    row_group_rows_read_ += max_tuples;
     COUNTER_ADD(scan_node_->rows_read_counter(), row_group_rows_read_);
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0dbfc5f..f53b7d0 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -207,12 +207,25 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) {
 int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
   DCHECK_GE(num_tuples, 0);
   DCHECK_EQ(scan_node_->tuple_idx(), 0);
-  DCHECK_EQ(conjunct_evals_->size(), 0);
-  if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples;
-
-  Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
-  for (int i = 0; i < num_tuples; ++i) row_tuple[i] = template_tuple_;
-  return num_tuples;
+  DCHECK_EQ(scan_node_->materialized_slots().size(), 0);
+  int num_to_commit = 0;
+  if (LIKELY(conjunct_evals_->size() == 0)) {
+    num_to_commit = num_tuples;
+  } else {
+    TupleRow template_tuple_row;
+    template_tuple_row.SetTuple(0, template_tuple_);
+    // Evaluate any conjuncts which may reference the partition columns.
+    for (int i = 0; i < num_tuples; ++i) {
+      if (EvalConjuncts(&template_tuple_row)) ++num_to_commit;
+    }
+  }
+  if (template_tuple_ != nullptr) {
+    Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
+    for (int i = 0; i < num_to_commit; ++i) row_tuple[i] = template_tuple_;
+  } else {
+    DCHECK_EQ(scan_node_->tuple_desc()->byte_size(), 0);
+  }
+  return num_to_commit;
 }
 
 bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields,

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index c603593..9b80d6c 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -328,8 +328,12 @@ class HdfsScanner {
     return ExecNode::EvalConjuncts(conjunct_evals_->data(), conjunct_evals_->size(), row);
   }
 
-  /// Sets 'num_tuples' template tuples in the batch that 'row' points to. Assumes the
-  /// 'tuple_row' only has a single tuple. Returns the number of tuples set.
+  /// Handles the case when there are no slots materialized (e.g. count(*)) by adding
+  /// up to 'num_tuples' rows to the row batch which 'row' points to. Assumes each tuple
+  /// row only has one tuple. Set the added tuples in the row batch with the template
+  /// tuple if it's not NULL. In the rare case when there are conjuncts, evaluate them
+  /// once for each row and only add a row when they evaluate to true. Returns the number
+  /// of tuple rows added.
   int WriteTemplateTuples(TupleRow* row, int num_tuples);
 
   /// Processes batches of fields and writes them out to tuple_row_mem.

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 67f598c..346a18a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -341,7 +341,7 @@ Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) {
         }
       }
     } else {
-      add_row = WriteTemplateTuples(tuple_row_mem, 1);
+      add_row = WriteTemplateTuples(tuple_row_mem, 1) > 0;
     }
     num_rows_read++;
     if (add_row) RETURN_IF_ERROR(CommitRows(1, row_batch));

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 7db8878..a9b56fe 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -244,8 +244,19 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch) {
   int num_rows_remaining = cur_kudu_batch_.NumRows() - cur_kudu_batch_num_read_;
   int rows_to_add = std::min(row_batch->capacity() - row_batch->num_rows(),
       num_rows_remaining);
+  int num_to_commit = 0;
+  if (LIKELY(conjunct_evals_.empty())) {
+    num_to_commit = rows_to_add;
+  } else {
+    for (int i = 0; i < rows_to_add; ++i) {
+      if (ExecNode::EvalConjuncts(conjunct_evals_.data(),
+              conjunct_evals_.size(), nullptr)) {
+        ++num_to_commit;
+      }
+    }
+  }
   cur_kudu_batch_num_read_ += rows_to_add;
-  row_batch->CommitRows(rows_to_add);
+  row_batch->CommitRows(num_to_commit);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index e6d4ca9..5617847 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -64,6 +64,8 @@ class KuduScanner {
  private:
   /// Handles the case where the projection is empty (e.g. count(*)).
   /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one.
+  /// If in the rare case where there is any conjunct, evaluate them once for each row
+  /// and add a row to the row batch only when the conjuncts evaluate to true.
   Status HandleEmptyProjection(RowBatch* row_batch);
 
   /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch.

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
index 115affa..f32c8a1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
@@ -140,3 +140,12 @@ order by id;
 ---- TYPES
 INT, TIMESTAMP
 ====
+---- QUERY
+# Regression test for IMPALA-6187. Make sure count(*) queries with partition columns only
+# won't miss conjuncts evaluation. 'id' is the partition column here.
+select count(*) from functional_kudu.alltypes where rand() + id < 0.0;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/63f17e9c/testdata/workloads/functional-query/queries/QueryTest/scanners.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index 658e4cf..99ec5c5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -74,3 +74,27 @@ from nulltable where b = ''
 ---- TYPES
 STRING, STRING
 ====
+---- QUERY
+# The following 3 tests are regression tests for IMPALA-6187. Make sure the conjuncts are
+# evaluated when there are no materialized slots or only partition columns are accessed.
+select count(*) from alltypes where rand() * 10 >= 0.0;
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====
+---- QUERY
+select count(*) from alltypes where rand() * 10 < 0.0;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+# 'year' and 'month' are partition columns.
+select count(*) from alltypes where rand() - year > month;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====


[6/7] impala git commit: IMPALA-5146: Fix inconsitent results at FROM_UNIXTIME()

Posted by ta...@apache.org.
IMPALA-5146: Fix inconsitent results at FROM_UNIXTIME()

The FROM_UNIXTIME(epoch) and FROM_UNIXTIME(epoch, format) produce
different results when epoch is out of range of TimestampValue.
The former produces an empty string, while the latter gives NULL.

The fix is to harmonize the results to NULL.

Testing:
Add unit tests to ExprTest.TimestampFunctions.

Change-Id: Ie3a5e9a9cb39d32993fa2c7f725be44d8b9ce9f2
Reviewed-on: http://gerrit.cloudera.org:8080/8629
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2fba80ee
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2fba80ee
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2fba80ee

Branch: refs/heads/master
Commit: 2fba80ee5ecf14117d7a9bfbbb0f27c45044f412
Parents: 6d9da17
Author: Jinchul <ji...@gmail.com>
Authored: Wed Nov 22 11:46:41 2017 +0900
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 29 05:22:32 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc                                       | 2 ++
 be/src/exprs/timestamp-functions-ir.cc                          | 5 +++--
 .../workloads/functional-query/queries/QueryTest/exprs.test     | 2 +-
 3 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2fba80ee/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 3277042..341a6e2 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -5637,6 +5637,8 @@ TEST_F(ExprTest, TimestampFunctions) {
   }
 
   TestIsNull("from_unixtime(NULL, 'yyyy-MM-dd')", TYPE_STRING);
+  TestIsNull("from_unixtime(999999999999999)", TYPE_STRING);
+  TestIsNull("from_unixtime(999999999999999, 'yyyy-MM-dd')", TYPE_STRING);
   TestStringValue("from_unixtime(unix_timestamp('1999-01-01 10:10:10'), \
       'yyyy-MM-dd')", "1999-01-01");
   TestStringValue("from_unixtime(unix_timestamp('1999-01-01 10:10:10'), \

http://git-wip-us.apache.org/repos/asf/impala/blob/2fba80ee/be/src/exprs/timestamp-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions-ir.cc b/be/src/exprs/timestamp-functions-ir.cc
index 7f0cc2f..9104bcf 100644
--- a/be/src/exprs/timestamp-functions-ir.cc
+++ b/be/src/exprs/timestamp-functions-ir.cc
@@ -79,8 +79,9 @@ StringVal TimestampFunctions::StringValFromTimestamp(FunctionContext* context,
 template <class TIME>
 StringVal TimestampFunctions::FromUnix(FunctionContext* context, const TIME& intp) {
   if (intp.is_null) return StringVal::null();
-  return AnyValUtil::FromString(context,
-      TimestampValue::FromUnixTime(intp.val).ToString());
+  const TimestampValue tv = TimestampValue::FromUnixTime(intp.val);
+  if (!tv.HasDateAndTime()) return StringVal::null();
+  return AnyValUtil::FromString(context, tv.ToString());
 }
 
 template <class TIME>

http://git-wip-us.apache.org/repos/asf/impala/blob/2fba80ee/testdata/workloads/functional-query/queries/QueryTest/exprs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exprs.test b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
index b1abcbd..f0b9fac 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exprs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exprs.test
@@ -2515,7 +2515,7 @@ set EXEC_SINGLE_NODE_ROWS_THRESHOLD=0;
 select min(from_unixtime(ts))
 from (values (1429705108399870 as ts), (1429704907057354)) invalid_timestamps;
 ---- RESULTS
-''
+'NULL'
 ---- TYPES
 STRING
 ====


[4/7] impala git commit: IMPALA-6241: timeout in admission control test under ASAN

Posted by ta...@apache.org.
IMPALA-6241: timeout in admission control test under ASAN

The fix for IMPALA-6241 is to increase the timeout for all slow builds.

While testing that fix, I discovered that the ASAN build detection logic
was failing silently, resulting in it assuming that it was testing a
DEBUG build. The error was:

  Unexpected DW_AT_name in first CU:
  /data/jenkins/workspace/verify-impala-toolchain-package-build/label/ec2-package-ubuntu-16-04/toolchain/source/llvm/llvm-3.9.1.src/projects/compiler-rt/lib/asan/asan_preinit.cc;
  choosing DEBUG

The fix for that issue is to remove the build type detection heuristic
and instead just write a file with the build type as part of the build process.

Testing:
Before this change I was able to reproduce locally every 5-10 test
iterations. After this change I haven't seen it reproduce.

Change-Id: Ia4ed949cac99b9925f72e19e4adaa2ead370b536
Reviewed-on: http://gerrit.cloudera.org:8080/8652
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/dc1282fb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/dc1282fb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/dc1282fb

Branch: refs/heads/master
Commit: dc1282fbc927a0e6fd93eba04bca9eaa568b2f3e
Parents: a88c3b9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Nov 27 11:31:21 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 29 03:28:22 2017 +0000

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 CMakeLists.txt                                  |   2 +
 bin/clean-cmake.sh                              |   1 +
 infra/python/deps/requirements.txt              |   2 -
 tests/common/environ.py                         | 187 ++++++-------------
 .../custom_cluster/test_admission_controller.py |   6 +-
 6 files changed, 64 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/dc1282fb/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index e757b27..8d8e4e4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,6 +29,7 @@ CMakeFiles
 cmake_install.cmake
 CTestTestfile.cmake
 !CMakeLists.txt
+.cmake_build_type
 .ninja_deps
 .ninja_log
 build.ninja

http://git-wip-us.apache.org/repos/asf/impala/blob/dc1282fb/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 916c249..4618c45 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -49,6 +49,8 @@ endif(NOT CMAKE_BUILD_TYPE)
 STRING (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
 
 message(STATUS "Build type is ${CMAKE_BUILD_TYPE}")
+# Write the build type to a file so that tests can determine the current build type.
+file(WRITE "${CMAKE_SOURCE_DIR}/.cmake_build_type" ${CMAKE_BUILD_TYPE})
 
 set(ENABLE_CODE_COVERAGE false)
 if ("${CMAKE_BUILD_TYPE}" STREQUAL "CODE_COVERAGE_DEBUG")

http://git-wip-us.apache.org/repos/asf/impala/blob/dc1282fb/bin/clean-cmake.sh
----------------------------------------------------------------------
diff --git a/bin/clean-cmake.sh b/bin/clean-cmake.sh
index a708391..92415fc 100755
--- a/bin/clean-cmake.sh
+++ b/bin/clean-cmake.sh
@@ -34,3 +34,4 @@ for loc in "${ROOT_DIR}/ -maxdepth 1" "${ROOT_DIR}/be/" "${ROOT_DIR}/fe/" "${ROO
   find ${loc} \( -iname CMakeCache.txt -o -iname CMakeFiles \
        -o -iname CTestTestfile.cmake -o -iname cmake_install.cmake \) -exec rm -Rf {} +
 done
+rm -f ${IMPALA_HOME}/.cmake_build_type

http://git-wip-us.apache.org/repos/asf/impala/blob/dc1282fb/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index 1c265c4..4c65d5a 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -42,12 +42,10 @@ hdfs == 2.0.2
   docopt == 0.6.2
   execnet == 1.4.0
 kazoo == 2.2.1
-monkeypatch == 0.1rc3
 ordereddict == 1.1
 pexpect == 3.3
 pg8000 == 1.10.2
 prettytable == 0.7.2
-pyelftools == 0.23
 pyparsing == 2.0.3
 pytest == 2.9.2
   py == 1.4.32

http://git-wip-us.apache.org/repos/asf/impala/blob/dc1282fb/tests/common/environ.py
----------------------------------------------------------------------
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 48c4cba..347ea44 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -19,30 +19,14 @@ import logging
 import os
 import re
 
-try:
-  from elftools.elf.elffile import ELFFile
-except ImportError as e:
-  # Handle pre-python2.7s' lack of collections.OrderedDict, which we include in
-  # impala-python as ordereddict.OrderedDict.
-  if 'cannot import name OrderedDict' == str(e):
-    import monkeypatch
-    from ordereddict import OrderedDict
-    monkeypatch.patch(OrderedDict, 'collections', 'OrderedDict')
-    from elftools.elf.elffile import ELFFile
-  else:
-    raise e
-
-
 LOG = logging.getLogger('tests.common.environ')
-
 test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS", "")
+IMPALA_HOME = os.environ.get("IMPALA_HOME", "")
 
 # Find the likely BuildType of the running Impala. Assume it's found through the path
 # $IMPALA_HOME/be/build/latest as a fallback.
-impala_home = os.environ.get("IMPALA_HOME", "")
 build_type_arg_regex = re.compile(r'--build_type=(\w+)', re.I)
 build_type_arg_search_result = re.search(build_type_arg_regex, test_start_cluster_args)
-
 if build_type_arg_search_result is not None:
   build_type_dir = build_type_search_result.groups()[0].lower()
 else:
@@ -50,48 +34,64 @@ else:
 
 # Resolve any symlinks in the path.
 impalad_basedir = \
-    os.path.realpath(os.path.join(impala_home, 'be/build', build_type_dir)).rstrip('/')
-
-IMPALAD_PATH = os.path.join(impalad_basedir, 'service', 'impalad')
-
+    os.path.realpath(os.path.join(IMPALA_HOME, 'be/build', build_type_dir)).rstrip('/')
 
 class SpecificImpaladBuildTypes:
   """
-  Represent a specific build type. These specific build types are needed by Python test
-  code.
-
-  The specific build types and their *most distinguishing* compiler options are:
-
-  1. ADDRESS_SANITIZER (clang -fsanitize=address)
-  2. DEBUG (gcc -ggdb)
-  3. DEBUG_CODE_COVERAGE (gcc -ggdb -ftest-coverage)
-  4. RELEASE (gcc)
-  5. RELEASE_CODE_COVERAGE (gcc -ftest-coverage)
-  6. THREAD_SANITIZER (clang -fsanitize=thread)
+  Represents the possible CMAKE_BUILD_TYPE values. These specific build types are needed
+  by Python test code, e.g. to set different timeouts for different builds. All values
+  are lower-cased to enable case-insensitive comparison.
   """
   # ./buildall.sh -asan
   ADDRESS_SANITIZER = 'address_sanitizer'
   # ./buildall.sh
   DEBUG = 'debug'
-  # ./buildall.sh -codecoverage
-  DEBUG_CODE_COVERAGE = 'debug_code_coverage'
   # ./buildall.sh -release
   RELEASE = 'release'
+  # ./buildall.sh -codecoverage
+  CODE_COVERAGE_DEBUG = 'code_coverage_debug'
   # ./buildall.sh -release -codecoverage
-  RELEASE_CODE_COVERAGE = 'release_code_coverage'
+  CODE_COVERAGE_RELEASE = 'code_coverage_release'
   # ./buildall.sh -tsan
-  THREAD_SANITIZER = 'thread_sanitizer'
+  TSAN = 'tsan'
+  # ./buildall.sh -ubsan
+  UBSAN = 'ubsan'
+
+  VALID_BUILD_TYPES = [ADDRESS_SANITIZER, DEBUG, CODE_COVERAGE_DEBUG, RELEASE,
+      CODE_COVERAGE_RELEASE, TSAN, UBSAN]
+
+  @classmethod
+  def detect(cls, impala_build_root):
+    """
+    Determine the build type based on the .cmake_build_type file created by
+    ${IMPALA_HOME}/CMakeLists.txt. impala_build_root should be the path of the
+    Impala source checkout, i.e. ${IMPALA_HOME}.
+    """
+    build_type_path = os.path.join(impala_build_root, ".cmake_build_type")
+    try:
+      with open(build_type_path) as build_type_file:
+        build_type = build_type_file.read().strip().lower()
+    except IOError:
+      LOG.error("Could not open %s assuming DEBUG", build_type_path)
+      return cls.DEBUG
+
+    if build_type not in cls.VALID_BUILD_TYPES:
+      raise Exception("Unknown build type {0}".format(build_type))
+    LOG.debug("Build type detected: %s", build_type)
+    return build_type
+
 
 
 class ImpaladBuild(object):
   """
   Acquires and provides characteristics about the way the Impala under test was compiled
-  and its likely effects on its responsiveness to automated test timings.
+  and its likely effects on its responsiveness to automated test timings. Currently
+  assumes that the Impala daemon under test was built in our current source checkout.
+  TODO: we could get this information for remote cluster tests if we exposed the build
+  type via a metric or the Impalad web UI.
   """
-  def __init__(self, impalad_path):
-    self.impalad_path = impalad_path
-    die_name, die_producer = self._get_impalad_dwarf_info()
-    self._set_impalad_build_type(die_name, die_producer)
+  def __init__(self, impala_build_root):
+    self._specific_build_type = SpecificImpaladBuildTypes.detect(impala_build_root)
 
   @property
   def specific_build_type(self):
@@ -104,8 +104,8 @@ class ImpaladBuild(object):
     """
     Return whether the Impala under test was compiled with code coverage enabled.
     """
-    return self.specific_build_type in (SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE,
-                                        SpecificImpaladBuildTypes.RELEASE_CODE_COVERAGE)
+    return self.specific_build_type in (SpecificImpaladBuildTypes.CODE_COVERAGE_DEBUG,
+                                        SpecificImpaladBuildTypes.CODE_COVERAGE_RELEASE)
 
   def is_asan(self):
     """
@@ -117,7 +117,13 @@ class ImpaladBuild(object):
     """
     Return whether the Impala under test was compiled with TSAN.
     """
-    return self.specific_build_type == SpecificImpaladBuildTypes.THREAD_SANITIZER
+    return self.specific_build_type == SpecificImpaladBuildTypes.TSAN
+
+  def is_ubsan(self):
+    """
+    Return whether the Impala under test was compiled with UBSAN.
+    """
+    return self.specific_build_type == SpecificImpaladBuildTypes.UBSAN
 
   def is_dev(self):
     """
@@ -126,97 +132,18 @@ class ImpaladBuild(object):
     """
     return self.specific_build_type in (
         SpecificImpaladBuildTypes.ADDRESS_SANITIZER, SpecificImpaladBuildTypes.DEBUG,
-        SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE,
-        SpecificImpaladBuildTypes.THREAD_SANITIZER)
+        SpecificImpaladBuildTypes.CODE_COVERAGE_DEBUG,
+        SpecificImpaladBuildTypes.TSAN, SpecificImpaladBuildTypes.UBSAN)
 
   def runs_slowly(self):
     """
     Return whether the Impala under test "runs slowly". For our purposes this means
-    either compiled with code coverage enabled, ASAN or TSAN.
-    """
-    return self.has_code_coverage() or self.is_asan() or self.is_tsan()
-
-  def _get_impalad_dwarf_info(self):
-    """
-    Read the impalad_path ELF binary, which is supposed to contain DWARF, and read the
-    DWARF to understand the compiler options. Return a 2-tuple of the two useful DIE
-    attributes of the first compile unit: the DW_AT_name and DW_AT_producer. If
-    something goes wrong doing this, log a warning and return nothing.
-    """
-    # Some useful references:
-    # - be/CMakeLists.txt
-    # - gcc(1), especially -grecord-gcc-switches, -g, -ggdb, -gdwarf-2
-    # - readelf(1)
-    # - general reading about DWARF
-    # A useful command for exploration without having to wade through many bytes is:
-    # readelf --debug-dump=info --dwarf-depth=1 impalad
-    # The DWARF lines are long, raw, and nasty; I'm hesitant to paste them here, so
-    # curious readers are highly encouraged to try the above, or read IMPALA-3501.
-    die_name = None
-    die_producer = None
-    try:
-      with open(self.impalad_path, 'rb') as fh:
-        impalad_elf = ELFFile(fh)
-        if impalad_elf.has_dwarf_info():
-          dwarf_info = impalad_elf.get_dwarf_info()
-          # We only need the first CU, hence the unconventional use of the iterator
-          # protocol.
-          cu_iterator = dwarf_info.iter_CUs()
-          first_cu = next(cu_iterator)
-          top_die = first_cu.get_top_DIE()
-          die_name = top_die.attributes['DW_AT_name'].value
-          die_producer = top_die.attributes['DW_AT_producer'].value
-    except Exception as e:
-      LOG.warn('Failure to read DWARF info from {0}: {1}'.format(self.impalad_path,
-                                                                 str(e)))
-    return die_name, die_producer
-
-  def _set_impalad_build_type(self, die_name, die_producer):
-    """
-    Use a heuristic based on the DW_AT_producer and DW_AT_name of the first compile
-    unit, as returned by _get_impalad_dwarf_info(), to figure out which of 5 supported
-    builds of impalad we're dealing with. If the heuristic can't determine, fall back to
-    assuming a debug build and log a warning.
-    """
-    ASAN_CU_NAME = 'asan_preinit.cc'
-    TSAN_CU_NAME = 'tsan_clock.cc'
-    DEFAULT_CU_NAME = 'daemon-main.cc'
-    GDB_FLAG = '-ggdb'
-    CODE_COVERAGE_FLAG = '-ftest-coverage'
-
-    if die_name is None or die_producer is None:
-      LOG.warn('Not enough DWARF info in {0} to determine build type; choosing '
-               'DEBUG'.format(self.impalad_path))
-      self._specific_build_type = SpecificImpaladBuildTypes.DEBUG
-      return
-
-    is_debug = GDB_FLAG in die_producer
-    specific_build_type = SpecificImpaladBuildTypes.DEBUG
-
-    if die_name.endswith(ASAN_CU_NAME):
-      specific_build_type = SpecificImpaladBuildTypes.ADDRESS_SANITIZER
-    if die_name.endswith(TSAN_CU_NAME):
-      specific_build_type = SpecificImpaladBuildTypes.THREAD_SANITIZER
-    elif not die_name.endswith(DEFAULT_CU_NAME):
-      LOG.warn('Unexpected DW_AT_name in first CU: {0}; choosing '
-               'DEBUG'.format(die_name))
-      specific_build_type = SpecificImpaladBuildTypes.DEBUG
-    elif CODE_COVERAGE_FLAG in die_producer:
-      if is_debug:
-        specific_build_type = SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE
-      else:
-        specific_build_type = SpecificImpaladBuildTypes.RELEASE_CODE_COVERAGE
-    else:
-      if is_debug:
-        specific_build_type = SpecificImpaladBuildTypes.DEBUG
-      else:
-        specific_build_type = SpecificImpaladBuildTypes.RELEASE
-
-    self._specific_build_type = specific_build_type
-
-
-IMPALAD_BUILD = ImpaladBuild(IMPALAD_PATH)
+    either compiled with code coverage enabled or one of the sanitizers.
+    """
+    return self.has_code_coverage() or self.is_asan() or self.is_tsan() or self.is_ubsan()
+
 
+IMPALAD_BUILD = ImpaladBuild(IMPALA_HOME)
 
 def specific_build_type_timeout(
     default_timeout, slow_build_timeout=None, asan_build_timeout=None,

http://git-wip-us.apache.org/repos/asf/impala/blob/dc1282fb/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index a26923d..020bb67 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -107,9 +107,9 @@ ROUND_ROBIN_SUBMISSION = [True, False]
 # pool with the parameters below.
 POOL_NAME = "default-pool"
 
-# Stress test timeout (seconds). The timeout needs to be significantly higher in code
-# coverage builds (IMPALA-3790).
-STRESS_TIMEOUT = specific_build_type_timeout(60, code_coverage_build_timeout=600)
+# Stress test timeout (seconds). The timeout needs to be significantly higher for
+# slow builds like code coverage and ASAN (IMPALA-3790, IMPALA-6241).
+STRESS_TIMEOUT = specific_build_type_timeout(60, slow_build_timeout=600)
 
 # The number of queries that can execute concurrently in the pool POOL_NAME.
 MAX_NUM_CONCURRENT_QUERIES = 5


[2/7] impala git commit: IMPALA-6053: Fix exception when storadeIds don't match hosts

Posted by ta...@apache.org.
IMPALA-6053: Fix exception when storadeIds don't match hosts

This commit fixes an issue where an IllegalStateException is thrown if
there is a mismatch between the number of storageIDs and the number of
host locations of a file block, causing the metadata load of a table to
abort. With this fix, the storadeIDs are ignored if they don't match the
number of hosts of a block, allowing table loading to proceed. That
change will also cause remote reads during table scans for the
blocks for which the mismatch was detected.

Testing:
No additional tests were added as this error was triggered on an EMC
Isilon system v8.0.

Change-Id: Ia3d685208dce7a1cbe94a33b8ac9aeb7c8a3f391
Reviewed-on: http://gerrit.cloudera.org:8080/8668
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0588309e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0588309e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0588309e

Branch: refs/heads/master
Commit: 0588309ed6938fd7c280a241be01d0b69b6357d9
Parents: 4f11bed
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Tue Nov 28 11:02:17 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Nov 28 23:40:38 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/HdfsPartition.java    | 27 ++++++++------------
 1 file changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0588309e/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 35309c5..e78ce92 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -20,7 +20,6 @@ package org.apache.impala.catalog;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -68,7 +67,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
-import com.google.common.primitives.Shorts;
 import com.google.flatbuffers.FlatBufferBuilder;
 
 /**
@@ -308,10 +306,10 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
         fbb.addShort(replicaIdx);
       }
       int fbReplicaHostIdxOffset = fbb.endVector();
-
-      // disk ids
       short[] diskIds = createDiskIds(loc, numUnknownDiskIds);
-      Preconditions.checkState(diskIds.length != 0);
+      Preconditions.checkState(diskIds.length == loc.getNames().length,
+          "Mismatch detected between number of diskIDs and block locations for block: " +
+          loc.toString());
       int fbDiskIdsOffset = FbFileBlock.createDiskIdsVector(fbb, diskIds);
       FbFileBlock.startFbFileBlock(fbb);
       FbFileBlock.addOffset(fbb, loc.getOffset());
@@ -345,20 +343,17 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
      * disk ids and populates 'numUnknownDiskIds' with the number of unknown disk ids.
      */
     private static short[] createDiskIds(BlockLocation location,
-        Reference<Long> numUnknownDiskIds) {
+        Reference<Long> numUnknownDiskIds) throws IOException {
       long unknownDiskIdCount = 0;
       String[] storageIds = location.getStorageIds();
-      String[] hosts;
-      try {
-        hosts = location.getHosts();
-      } catch (IOException e) {
-        LOG.error("Couldn't get hosts for block: " + location.toString(), e);
-        return new short[0];
-      }
+      String[] hosts = location.getHosts();
       if (storageIds.length != hosts.length) {
-        LOG.error("Number of storage IDs and number of hosts for block: " + location
-            .toString() + " mismatch. Skipping disk ID loading for this block.");
-        return Shorts.toArray(Collections.<Short>emptyList());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(String.format("Number of storage IDs and number of hosts for block " +
+              "%s mismatch (storageIDs:hosts) %d:%d. Skipping disk ID loading for this " +
+              "block.", location.toString(), storageIds.length, hosts.length));
+        }
+        storageIds = new String[hosts.length];
       }
       short[] diskIDs = new short[storageIds.length];
       for (int i = 0; i < storageIds.length; ++i) {