You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/12 22:01:08 UTC
[3/6] impala git commit: IMPALA-6629: Clean up catalog update logging
IMPALA-6629: Clean up catalog update logging
IMPALA-5990 introduced some redundant and unclear logging in the process
of assembling and sending catalog updates. This patch removes the
duplication, rewords some logs, and adds a log message when a catalog
update is fully assembled.
Change-Id: Iaa096b8c84304f28b37ac5e6794d688ba0a949a7
Reviewed-on: http://gerrit.cloudera.org:8080/9566
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4e12ba6b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4e12ba6b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4e12ba6b
Branch: refs/heads/master
Commit: 4e12ba6ba563510addad3e2766aa32188b1e5ea9
Parents: e096233
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Thu Mar 8 16:38:57 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Mar 10 00:42:36 2018 +0000
----------------------------------------------------------------------
be/src/catalog/catalog-server.cc | 21 +++++++++++---------
be/src/catalog/catalog-server.h | 4 ++--
be/src/service/fe-support.cc | 10 +++++-----
.../java/org/apache/impala/catalog/Catalog.java | 4 +++-
.../impala/catalog/CatalogServiceCatalog.java | 8 ++------
.../org/apache/impala/service/FeSupport.java | 2 +-
6 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/4e12ba6b/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 2f4bcbe..8a91c25 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -233,16 +233,18 @@ void CatalogServer::UpdateCatalogTopicCallback(
// to reload the full catalog.
if (delta.from_version == 0 && catalog_objects_min_version_ != 0) {
last_sent_catalog_version_ = 0L;
- } else {
+ } else if (!pending_topic_updates_.empty()) {
// Process the pending topic update.
- LOG_EVERY_N(INFO, 300) << "Catalog Version: " << catalog_objects_max_version_
- << " Last Catalog Version: " << last_sent_catalog_version_;
-
subscriber_topic_updates->emplace_back();
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = IMPALA_CATALOG_TOPIC;
update.topic_entries = std::move(pending_topic_updates_);
+ VLOG(1) << "A catalog update with " << update.topic_entries.size()
+ << " entries is assembled. Catalog version: "
+ << catalog_objects_max_version_ << " Last sent catalog version: "
+ << last_sent_catalog_version_;
+
// Update the new catalog version and the set of known catalog objects.
last_sent_catalog_version_ = catalog_objects_max_version_;
}
@@ -457,8 +459,8 @@ void CatalogServer::TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
}
}
-bool CatalogServer::AddPendingTopicItem(std::string key, const uint8_t* item_data,
- uint32_t size, bool deleted) {
+bool CatalogServer::AddPendingTopicItem(std::string key, int64_t version,
+ const uint8_t* item_data, uint32_t size, bool deleted) {
pending_topic_updates_.emplace_back();
TTopicItem& item = pending_topic_updates_.back();
if (FLAGS_compact_catalog_topic) {
@@ -474,8 +476,9 @@ bool CatalogServer::AddPendingTopicItem(std::string key, const uint8_t* item_dat
}
item.key = std::move(key);
item.deleted = deleted;
- VLOG(1) << "Publishing " << (deleted ? "deletion: " : "update: ") << item.key <<
- " original size: " << size << (FLAGS_compact_catalog_topic ?
- Substitute(" compressed size: $0", item.value.size()) : string());
+ VLOG(1) << "Collected " << (deleted ? "deletion: " : "update: ") << item.key
+ << ", version=" << version << ", original size=" << size
+ << (FLAGS_compact_catalog_topic ?
+ Substitute(", compressed size=$0", item.value.size()) : string());
return true;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4e12ba6b/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index a6a0c3f..2fa8ce7 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -76,8 +76,8 @@ class CatalogServer {
/// Add a topic item to pending_topic_updates_. Caller must hold catalog_lock_.
/// The return value is true if the operation succeeds and false otherwise.
- bool AddPendingTopicItem(std::string key, const uint8_t* item_data, uint32_t size,
- bool deleted);
+ bool AddPendingTopicItem(std::string key, int64_t version, const uint8_t* item_data,
+ uint32_t size, bool deleted);
private:
/// Thrift API implementation which proxies requests onto this CatalogService.
http://git-wip-us.apache.org/repos/asf/impala/blob/4e12ba6b/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 2d48d73..9d59883 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -428,7 +428,7 @@ Java_org_apache_impala_service_FeSupport_NativeLookupSymbol(
extern "C"
JNIEXPORT jboolean JNICALL
Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
- jclass caller_class, jlong native_catalog_server_ptr, jstring key,
+ jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong version,
jbyteArray serialized_object, jboolean deleted) {
std::string key_string;
{
@@ -442,9 +442,9 @@ Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
return static_cast<jboolean>(false);
}
- reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->AddPendingTopicItem(
- std::move(key_string), obj_buf.get(), static_cast<uint32_t>(obj_buf.size()),
- deleted);
+ reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
+ AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
+ static_cast<uint32_t>(obj_buf.size()), deleted);
return static_cast<jboolean>(true);
}
@@ -575,7 +575,7 @@ static JNINativeMethod native_methods[] = {
(void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions
},
{
- (char*)"NativeAddPendingTopicItem", (char*)"(JLjava/lang/String;[BZ)Z",
+ (char*)"NativeAddPendingTopicItem", (char*)"(JLjava/lang/String;J[BZ)Z",
(void*)::Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem
},
{
http://git-wip-us.apache.org/repos/asf/impala/blob/4e12ba6b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 0cd1eda..bee5be5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -35,6 +35,7 @@ import org.apache.impala.util.PatternMatcher;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.impala.util.TUniqueIdUtil;
/**
* Thread safe interface for reading and updating metadata stored in the Hive MetaStore.
@@ -574,7 +575,8 @@ public abstract class Catalog {
case DATA_SOURCE:
return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
case CATALOG:
- return "CATALOG:" + catalogObject.getCatalog().catalog_service_id;
+ return "CATALOG:" +
+ TUniqueIdUtil.PrintId(catalogObject.getCatalog().catalog_service_id);
default:
throw new IllegalStateException(
"Unsupported catalog object type: " + catalogObject.getType());
http://git-wip-us.apache.org/repos/asf/impala/blob/4e12ba6b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 7ea821c..5bef242 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -422,12 +422,8 @@ public class CatalogServiceCatalog extends Catalog {
// TODO: TSerializer.serialize() returns a copy of the internal byte array, which
// could be elided.
byte[] data = serializer.serialize(obj);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Collected catalog " + (delete ? "deletion: " : "update: ") + key +
- " version: " + obj.catalog_version);
- }
- if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, key, data, delete))
- {
+ if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, key,
+ obj.catalog_version, data, delete)) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + key + ", delete="
+ delete + ", data_size=" + data.length);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4e12ba6b/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index d4311ff..16fdd22 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -86,7 +86,7 @@ public class FeSupport {
// 'serializationBuffer' is a serialized TCatalogObject.
// The return value is true if the operation succeeds and false otherwise.
public native static boolean NativeAddPendingTopicItem(long nativeCatalogServerPtr,
- String key, byte[] serializationBuffer, boolean deleted);
+ String key, long version, byte[] serializationBuffer, boolean deleted);
// Get a catalog object update from the backend. A pair of isDeletion flag and
// serialized TCatalogObject is returned.