You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/12 22:01:01 UTC

[2/4] impala git commit: IMPALA-6629: Clean up catalog update logging

IMPALA-6629: Clean up catalog update logging

IMPALA-5990 introduced some redundant and unclear logging in the process
of assembling and sending catalog updates. This patch removes the
duplication, rewords some logs, and adds a log message when a catalog
update is fully assembled.

Change-Id: Iaa096b8c84304f28b37ac5e6794d688ba0a949a7
Reviewed-on: http://gerrit.cloudera.org:8080/9566
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/84391071
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/84391071
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/84391071

Branch: refs/heads/2.x
Commit: 8439107125313ea36faad5eb99446c4f88e2a00f
Parents: 10fced4
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Mar 8 16:38:57 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Mar 10 06:49:08 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                | 21 +++++++++++---------
 be/src/catalog/catalog-server.h                 |  4 ++--
 be/src/service/fe-support.cc                    | 10 +++++-----
 .../java/org/apache/impala/catalog/Catalog.java |  4 +++-
 .../impala/catalog/CatalogServiceCatalog.java   |  8 ++------
 .../org/apache/impala/service/FeSupport.java    |  2 +-
 6 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/84391071/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 2f4bcbe..8a91c25 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -233,16 +233,18 @@ void CatalogServer::UpdateCatalogTopicCallback(
   // to reload the full catalog.
   if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
     last_sent_catalog_version_ = 0L;
-  } else {
+  } else if (!pending_topic_updates_.empty()) {
     // Process the pending topic update.
-    LOG_EVERY_N(INFO, 300) << "Catalog Version: " << catalog_objects_max_version_
-                           << " Last Catalog Version: " << last_sent_catalog_version_;
-
     subscriber_topic_updates->emplace_back();
     TTopicDelta& update = subscriber_topic_updates->back();
     update.topic_name = IMPALA_CATALOG_TOPIC;
     update.topic_entries = std::move(pending_topic_updates_);
 
+    VLOG(1) << "A catalog update with " << update.topic_entries.size()
+            << " entries is assembled. Catalog version: "
+            << catalog_objects_max_version_ << " Last sent catalog version: "
+            << last_sent_catalog_version_;
+
     // Update the new catalog version and the set of known catalog objects.
     last_sent_catalog_version_ = catalog_objects_max_version_;
   }
@@ -457,8 +459,8 @@ void CatalogServer::TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
   }
 }
 
-bool CatalogServer::AddPendingTopicItem(std::string key, const uint8_t* item_data,
-    uint32_t size, bool deleted) {
+bool CatalogServer::AddPendingTopicItem(std::string key, int64_t version,
+    const uint8_t* item_data, uint32_t size, bool deleted) {
   pending_topic_updates_.emplace_back();
   TTopicItem& item = pending_topic_updates_.back();
   if (FLAGS_compact_catalog_topic) {
@@ -474,8 +476,9 @@ bool CatalogServer::AddPendingTopicItem(std::string key, const uint8_t* item_dat
   }
   item.key = std::move(key);
   item.deleted = deleted;
-  VLOG(1) << "Publishing " << (deleted ? "deletion: " : "update: ") << item.key <<
-      " original size: " << size << (FLAGS_compact_catalog_topic ?
-      Substitute(" compressed size: $0", item.value.size()) : string());
+  VLOG(1) << "Collected " << (deleted ? "deletion: " : "update: ") << item.key
+          << ", version=" << version << ", original size=" << size
+          << (FLAGS_compact_catalog_topic ?
+              Substitute(", compressed size=$0", item.value.size()) : string());
   return true;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/84391071/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index a6a0c3f..2fa8ce7 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -76,8 +76,8 @@ class CatalogServer {
 
   /// Add a topic item to pending_topic_updates_. Caller must hold catalog_lock_.
   /// The return value is true if the operation succeeds and false otherwise.
-  bool AddPendingTopicItem(std::string key, const uint8_t* item_data, uint32_t size,
-      bool deleted);
+  bool AddPendingTopicItem(std::string key, int64_t version, const uint8_t* item_data,
+      uint32_t size, bool deleted);
 
  private:
   /// Thrift API implementation which proxies requests onto this CatalogService.

http://git-wip-us.apache.org/repos/asf/impala/blob/84391071/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 2d48d73..9d59883 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -428,7 +428,7 @@ Java_org_apache_impala_service_FeSupport_NativeLookupSymbol(
 extern "C"
 JNIEXPORT jboolean JNICALL
 Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
-    jclass caller_class, jlong native_catalog_server_ptr, jstring key,
+    jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong version,
     jbyteArray serialized_object, jboolean deleted) {
   std::string key_string;
   {
@@ -442,9 +442,9 @@ Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
   if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
     return static_cast<jboolean>(false);
   }
-  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->AddPendingTopicItem(
-      std::move(key_string), obj_buf.get(), static_cast<uint32_t>(obj_buf.size()),
-      deleted);
+  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
+      AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
+      static_cast<uint32_t>(obj_buf.size()), deleted);
   return static_cast<jboolean>(true);
 }
 
@@ -575,7 +575,7 @@ static JNINativeMethod native_methods[] = {
       (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions
   },
   {
-      (char*)"NativeAddPendingTopicItem", (char*)"(JLjava/lang/String;[BZ)Z",
+      (char*)"NativeAddPendingTopicItem", (char*)"(JLjava/lang/String;J[BZ)Z",
       (void*)::Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem
   },
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/84391071/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 0cd1eda..bee5be5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -35,6 +35,7 @@ import org.apache.impala.util.PatternMatcher;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.impala.util.TUniqueIdUtil;
 
 /**
  * Thread safe interface for reading and updating metadata stored in the Hive MetaStore.
@@ -574,7 +575,8 @@ public abstract class Catalog {
       case DATA_SOURCE:
         return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
       case CATALOG:
-        return "CATALOG:" + catalogObject.getCatalog().catalog_service_id;
+        return "CATALOG:" +
+            TUniqueIdUtil.PrintId(catalogObject.getCatalog().catalog_service_id);
       default:
         throw new IllegalStateException(
             "Unsupported catalog object type: " + catalogObject.getType());

http://git-wip-us.apache.org/repos/asf/impala/blob/84391071/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 7ea821c..5bef242 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -422,12 +422,8 @@ public class CatalogServiceCatalog extends Catalog {
       // TODO: TSerializer.serialize() returns a copy of the internal byte array, which
       // could be elided.
       byte[] data = serializer.serialize(obj);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Collected catalog " + (delete ? "deletion: " : "update: ") + key +
-            " version: " + obj.catalog_version);
-      }
-      if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, key, data, delete))
-      {
+      if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, key,
+          obj.catalog_version, data, delete)) {
         LOG.error("NativeAddPendingTopicItem failed in BE. key=" + key + ", delete="
             + delete + ", data_size=" + data.length);
       }

http://git-wip-us.apache.org/repos/asf/impala/blob/84391071/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index d4311ff..16fdd22 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -86,7 +86,7 @@ public class FeSupport {
   // 'serializationBuffer' is a serialized TCatalogObject.
   // The return value is true if the operation succeeds and false otherwise.
   public native static boolean NativeAddPendingTopicItem(long nativeCatalogServerPtr,
-      String key, byte[] serializationBuffer, boolean deleted);
+      String key, long version, byte[] serializationBuffer, boolean deleted);
 
   // Get a catalog object update from the backend. A pair of isDeletion flag and
   // serialized TCatalogObject is returned.