You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2019/02/15 04:59:48 UTC

[impala] 06/06: IMPALA-7961: Avoid adding unmodified objects to DDL response

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5ed6c665d190dbe5303e241afbc50e0eacb0a6af
Author: Bharath Vissapragada <bh...@cloudera.com>
AuthorDate: Sun Feb 10 17:03:19 2019 -0800

    IMPALA-7961: Avoid adding unmodified objects to DDL response
    
    When a DDL is processed, we typically add the affected (added/removed)
    objects to the response TCatalogUpdateResult struct. This response
    is processed on the coordinator and the changes are applied locally.
    When SYNC_DDL is enabled, the Catalog server also includes a topic
    version number that should include all the affected objects so that the
    coordinator can wait for that miniumum topic version to be applied on
    all other coordinators before returning the control back to the user.
    This covering topic version is calculated by looking at the topic
    update log, which contains all the in-flight updates (and to an extent
    past updates) that are perodically GC'ed.
    
    Bug: In certain cases like CREATE TBL IF NOT EXISTS, we could end up
    adding objects to the DDL response which haven't been modified in a
    while (> TOPIC_UPDATE_LOG_GC_FREQUENCY) and hence could be potentially
    GC'ed from the TopicUpdateLog. This means that the Catalog server
    wouldn't be able to find a covering topic update version and eventually
    gives up throwing an error as described in the jira.
    
    Fix: Bumps the version of any objects that already exists when IF EXISTS
    is used in conjunction with SYNC_DDL. This makes sure that the object is
    included in the upcoming topic updates and waitForSyncDdlVersion() can find
    a covering topic update that includes this object. This is a hack and could
    cause false-positive invalidations, but definitely better than breaking
    SYNC_DDL semantics.
    
    Also added some additional diagnostic logging that could've simplified
    debugging an issue like this.
    
    Testing: Since this is a racy bug, I could only repro it by forcing
    frequent topic update log GCs along with a specific sequence of
    actions. Couldn't reproduce it with the patch.
    
    Change-Id: If3e914b70ba796c9b224e9dea559b8c40aa25d83
    Reviewed-on: http://gerrit.cloudera.org:8080/12428
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 11 ++-
 .../org/apache/impala/catalog/FeCatalogUtils.java  | 15 ++++
 .../org/apache/impala/catalog/TopicUpdateLog.java  | 16 ++---
 .../apache/impala/service/CatalogOpExecutor.java   | 81 +++++++++++++++++-----
 4 files changed, 92 insertions(+), 31 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 274ca35..0904714 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -80,7 +80,7 @@ import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -91,6 +91,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -171,7 +172,7 @@ import com.google.common.collect.Sets;
  * loading thread pool.
  */
 public class CatalogServiceCatalog extends Catalog {
-  public static final Logger LOG = Logger.getLogger(CatalogServiceCatalog.class);
+  public static final Logger LOG = LoggerFactory.getLogger(CatalogServiceCatalog.class);
 
   private static final int INITIAL_META_STORE_CLIENT_POOL_SIZE = 10;
   private static final int MAX_NUM_SKIPPED_TOPIC_UPDATES = 2;
@@ -1279,7 +1280,7 @@ public class CatalogServiceCatalog extends Catalog {
         tableLoadingMgr_.backgroundLoad(tblName);
       }
     } catch (Exception e) {
-      LOG.error(e);
+      LOG.error("Error initializing Catalog", e);
       throw new CatalogException("Error initializing Catalog. Catalog may be empty.", e);
     } finally {
       versionLock_.writeLock().unlock();
@@ -2237,6 +2238,10 @@ public class CatalogServiceCatalog extends Catalog {
         if (lastSentTopicUpdate != currentTopicUpdate) {
           ++numAttempts;
           if (numAttempts > maxNumAttempts) {
+            LOG.error(String.format("Couldn't retrieve the covering topic version for "
+                + "catalog objects. Updated objects: %s, deleted objects: %s",
+                FeCatalogUtils.debugString(result.updated_catalog_objects),
+                FeCatalogUtils.debugString(result.removed_catalog_objects)));
             throw new CatalogException("Couldn't retrieve the catalog topic version " +
                 "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
                 "The operation has been successfully executed but its effects may have " +
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index d10dadc..7e55d73 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -40,6 +40,7 @@ import org.apache.impala.catalog.local.CatalogdMetaProvider;
 import org.apache.impala.catalog.local.LocalCatalog;
 import org.apache.impala.catalog.local.MetaProvider;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.THdfsPartition;
@@ -387,4 +388,18 @@ public abstract class FeCatalogUtils {
   }
 
 
+  /**
+   * Returns a debug string for a given list of TCatalogObjects. Includes the unique key
+   * and version number for each object.
+   */
+  public static String debugString(List<TCatalogObject> objects) {
+    if (objects == null || objects.size() == 0) return "[]";
+    List<String> catalogObjs = new ArrayList<>();
+    for (TCatalogObject object: objects) {
+      catalogObjs.add(String.format("%s version: %d",
+          Catalog.toCatalogObjectKey(object), object.catalog_version));
+    }
+    return "[" + Joiner.on(",").join(catalogObjs) + "]";
+  }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
index 779d8f7..be80b3b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -20,7 +20,8 @@ package org.apache.impala.catalog;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -37,7 +38,7 @@ import com.google.common.base.Strings;
 // by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from
 // the log.
 public class TopicUpdateLog {
-  private static final Logger LOG = Logger.getLogger(TopicUpdateLog.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TopicUpdateLog.class);
   // Frequency at which the entries of the topic update log are garbage collected.
   // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.
   private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000;
@@ -104,9 +105,8 @@ public class TopicUpdateLog {
       return;
     }
     if (numTopicUpdatesToGc_ == 0) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Topic update log GC started.");
-      }
+      LOG.info("Topic update log GC started. GC-ing topics with versions " +
+          "<= {}", oldestTopicUpdateToGc_);
       Preconditions.checkState(oldestTopicUpdateToGc_ > 0);
       int numEntriesRemoved = 0;
       for (Map.Entry<String, Entry> entry:
@@ -120,10 +120,8 @@ public class TopicUpdateLog {
       }
       numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
       oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Topic update log GC finished. Removed " + numEntriesRemoved +
-            " entries.");
-      }
+      LOG.info("Topic update log GC finished. Removed {} entries.",
+          numEntriesRemoved);
     } else {
       --numTopicUpdatesToGc_;
     }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 137e682..19021b0 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -169,7 +169,7 @@ import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.MetaStoreUtil;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
 import org.apache.thrift.TException;
 
 import com.codahale.metrics.Timer;
@@ -179,6 +179,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class used to execute Catalog Operations, including DDL and refresh/invalidate
@@ -249,7 +250,7 @@ import com.google.common.collect.Sets;
  * metastore out of this class.
  */
 public class CatalogOpExecutor {
-  private static final Logger LOG = Logger.getLogger(CatalogOpExecutor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogOpExecutor.class);
   // Format string for exceptions returned by Hive Metastore RPCs.
   private final static String HMS_RPC_ERROR_FORMAT_STR =
       "Error making '%s' RPC to Hive Metastore: ";
@@ -280,6 +281,7 @@ public class CatalogOpExecutor {
       requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
     }
 
+    boolean syncDdl = ddlRequest.isSync_ddl();
     switch (ddlRequest.ddl_type) {
       case ALTER_TABLE:
         alterTable(ddlRequest.getAlter_table_params(), response);
@@ -291,14 +293,14 @@ public class CatalogOpExecutor {
         createDatabase(ddlRequest.getCreate_db_params(), response);
         break;
       case CREATE_TABLE_AS_SELECT:
-        response.setNew_table_created(
-            createTable(ddlRequest.getCreate_table_params(), response));
+        response.setNew_table_created(createTable(
+            ddlRequest.getCreate_table_params(), response, syncDdl));
         break;
       case CREATE_TABLE:
-        createTable(ddlRequest.getCreate_table_params(), response);
+        createTable(ddlRequest.getCreate_table_params(), response, syncDdl);
         break;
       case CREATE_TABLE_LIKE:
-        createTableLike(ddlRequest.getCreate_table_like_params(), response);
+        createTableLike(ddlRequest.getCreate_table_like_params(), response, syncDdl);
         break;
       case CREATE_VIEW:
         createView(ddlRequest.getCreate_view_params(), response);
@@ -363,7 +365,7 @@ public class CatalogOpExecutor {
     // operation. The version of this catalog update is returned to the requesting
     // impalad which will wait until this catalog update has been broadcast to all the
     // coordinators.
-    if (ddlRequest.isSync_ddl()) {
+    if (syncDdl) {
       response.getResult().setVersion(
           catalog_.waitForSyncDdlVersion(response.getResult()));
     }
@@ -1762,9 +1764,12 @@ public class CatalogOpExecutor {
    * lazily load the new metadata on the next access. If this is a managed Kudu table,
    * the table is also created in the Kudu storage engine. Re-throws any HMS or Kudu
    * exceptions encountered during the create.
+   * @param  syncDdl tells if SYNC_DDL option is enabled on this DDL request.
+   * @return true if a new table has been created with the given params, false
+   * otherwise.
    */
-  private boolean createTable(TCreateTableParams params, TDdlExecResponse response)
-      throws ImpalaException {
+  private boolean createTable(TCreateTableParams params, TDdlExecResponse response,
+      boolean syncDdl) throws ImpalaException {
     Preconditions.checkNotNull(params);
     TableName tableName = TableName.fromThrift(params.getTable_name());
     Preconditions.checkState(tableName != null && tableName.isFullyQualified());
@@ -1774,18 +1779,36 @@ public class CatalogOpExecutor {
     Table existingTbl = catalog_.getTableNoThrow(tableName.getDb(), tableName.getTbl());
     if (params.if_not_exists && existingTbl != null) {
       addSummary(response, "Table already exists.");
-      LOG.trace(String.format("Skipping table creation because %s already exists and " +
-          "IF NOT EXISTS was specified.", tableName));
-      existingTbl.getLock().lock();
+      LOG.trace("Skipping table creation because {} already exists and " +
+          "IF NOT EXISTS was specified.", tableName);
+      tryLock(existingTbl);
       try {
-        addTableToCatalogUpdate(existingTbl, response.getResult());
-        return false;
+        if (syncDdl) {
+          // When SYNC_DDL is enabled and the table already exists, we force a version
+          // bump on it so that it is added to the next statestore update. Without this
+          // we could potentially be referring to a table object that has already been
+          // GC'ed from the TopicUpdateLog and waitForSyncDdlVersion() cannot find a
+          // covering topic version (IMPALA-7961).
+          //
+          // This is a conservative hack to not break the SYNC_DDL semantics and could
+          // possibly result in false-positive invalidates on this table. However, that is
+          // better than breaking the SYNC_DDL semantics and the subsequent queries
+          // referring to this table failing with "table not found" errors.
+          long newVersion = catalog_.incrementAndGetCatalogVersion();
+          existingTbl.setCatalogVersion(newVersion);
+          LOG.trace("Table {} version bumped to {} because SYNC_DDL is enabled.",
+              tableName, newVersion);
+        }
+        addTableToCatalogUpdate(existingTbl, response.result);
       } finally {
+        // Release the locks held in tryLock().
+        catalog_.getLock().writeLock().unlock();
         existingTbl.getLock().unlock();
       }
+      return false;
     }
     org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
-    LOG.trace(String.format("Creating table %s", tableName));
+    LOG.trace("Creating table {}", tableName);
     if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response);
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
@@ -2015,8 +2038,10 @@ public class CatalogOpExecutor {
    * No data is copied as part of this process, it is a metadata only operation. If the
    * creation succeeds, an entry is added to the metadata cache to lazily load the new
    * table's metadata on the next access.
+   * @param  syncDdl tells is SYNC_DDL is enabled for this DDL request.
    */
-  private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response)
+  private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response
+      , boolean syncDdl)
       throws ImpalaException {
     Preconditions.checkNotNull(params);
 
@@ -2033,13 +2058,31 @@ public class CatalogOpExecutor {
       addSummary(response, "Table already exists.");
       LOG.trace(String.format("Skipping table creation because %s already exists and " +
           "IF NOT EXISTS was specified.", tblName));
-      existingTbl.getLock().lock();
+      tryLock(existingTbl);
       try {
-        addTableToCatalogUpdate(existingTbl, response.getResult());
-        return;
+        if (syncDdl) {
+          // When SYNC_DDL is enabled and the table already exists, we force a version
+          // bump on it so that it is added to the next statestore update. Without this
+          // we could potentially be referring to a table object that has already been
+          // GC'ed from the TopicUpdateLog and waitForSyncDdlVersion() cannot find a
+          // covering topic version (IMPALA-7961).
+          //
+          // This is a conservative hack to not break the SYNC_DDL semantics and could
+          // possibly result in false-positive invalidates on this table. However, that is
+          // better than breaking the SYNC_DDL semantics and the subsequent queries
+          // referring to this table failing with "table not found" errors.
+          long newVersion = catalog_.incrementAndGetCatalogVersion();
+          existingTbl.setCatalogVersion(newVersion);
+          LOG.trace("Table {} version bumped to {} because SYNC_DDL is enabled.",
+              existingTbl.getFullName(), newVersion);
+        }
+        addTableToCatalogUpdate(existingTbl, response.result);
       } finally {
+        // Release the locks held in tryLock().
+        catalog_.getLock().writeLock().unlock();
         existingTbl.getLock().unlock();
       }
+      return;
     }
     Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl());
     org.apache.hadoop.hive.metastore.api.Table tbl =