You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/01/05 15:32:18 UTC

[5/5] incubator-impala git commit: IMPALA-3641: Fix catalogd RPC responses to DROP IF EXISTS.

IMPALA-3641: Fix catalogd RPC responses to DROP IF EXISTS.

The main problem was that the catalogd's response to
a DROP IF EXISTS operations included a removed object
that was applied to the requesting impalad's catalog cache.
In particular, a DROP DATABASE IF EXISTS that did not actually
drop anything in the catalogd still returned the object name in
the RPC response as a removed object with the *current* catalog
version (i.e., without incrementing the catalog version).

The above behavior lead to a situation where a drop of
a non-existent object overwrote a legitimate entry in
an impalad's CatalogDeltaLog. Recall that the version of the
dropped object was based on the current catalog version
at some point in time, e.g., the same version of a
legitimate entry in the CatalogDeltaLog.

As a reminder, the CatalogDeltaLog protects deletions from
being undone via updates from the statestore. So overwriting
an object in the CatalogDeltaLog can lead to a dropped object
appearing again with certain timing of a statestore update.

Please see the JIRA for an analysis of logging output that
shows the bug and its effect.

The fix is simple: The RPC response of a DROP IF EXISTS should
only contain a removed object if an object was actually
removed from the catalogd.

This fix, however, introduces a new consistency issue (IMPALA-4727).
The new behavior is not ideal, but better than the old behavior,
explained as follows:
The behavior before this patch is problematic because the drop of a
completely unrelated object can affect the consistency of a drop+add
on another object.
The behavior after this patch is that a drop+add may fail in the add
if there is an ill-timed concurrent drop of the same object.

Testing:
- Unfortunately, I have not been able to reproduce the issue
  locally despite vigorous attempts and despite knowing what
  the problem is. Our existing tests seem to reproduce the
  issue pretty reliably, so it's not clear whether a targeted
  test is feasible or needed.
- An exhaustive test run passed.

Change-Id: Icb1f31eb2ecf05b9b51ef4e12e6bb78f44d0cf84
Reviewed-on: http://gerrit.cloudera.org:8080/5556
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6d15f037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6d15f037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6d15f037

Branch: refs/heads/master
Commit: 6d15f03777afeb1375927ad9520a5db5bc9d42a1
Parents: 95ed443
Author: Alex Behm <al...@cloudera.com>
Authored: Tue Dec 20 00:12:51 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 5 04:15:30 2017 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |  21 +--
 .../impala/service/CatalogOpExecutor.java       | 128 +++++++++----------
 2 files changed, 77 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d15f037/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 85d92cb..b51c9aa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -26,12 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.UUID;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.Path;
@@ -45,10 +45,6 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.log4j.Logger;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.TException;
-
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -69,6 +65,10 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -866,14 +866,19 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Renames a table. Equivalent to an atomic drop + add of the table. Returns
-   * the new Table object with an incremented catalog version or null if operation
-   * was not successful.
+   * the new Table object with an incremented catalog version or null if the
+   * drop or add were unsuccessful. If null is returned, then the catalog cache
+   * is in one of the following two states:
+   * 1. Old table was not removed, and new table was not added
+   * 2. Old table was removed, but new table was not added
    */
   public Table renameTable(TTableName oldTableName, TTableName newTableName)
       throws CatalogException {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
-    if (db != null) db.removeTable(oldTableName.getTable_name());
+    if (db == null) return null;
+    Table oldTable = db.removeTable(oldTableName.getTable_name());
+    if (oldTable == null) return null;
     return addTable(newTableName.getDb_name(), newTableName.getTable_name());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d15f037/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f878b12..d9448b3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.apache.impala.common.Reference;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.User;
@@ -85,10 +84,11 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.JniCatalogConstants;
-import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
+import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableAddReplaceColsParams;
 import org.apache.impala.thrift.TAlterTableChangeColParams;
 import org.apache.impala.thrift.TAlterTableDropColParams;
@@ -585,8 +585,9 @@ public class CatalogOpExecutor {
    * version of the serialized table as the version of the catalog update result.
    */
   private static void addTableToCatalogUpdate(Table tbl, TCatalogUpdateResult result) {
-    TCatalogObject updatedCatalogObject = TableToTCatalogObject(tbl);
-    result.setUpdated_catalog_object_DEPRECATED(TableToTCatalogObject(tbl));
+    Preconditions.checkNotNull(tbl);
+    TCatalogObject updatedCatalogObject = tbl.toTCatalogObject();
+    result.setUpdated_catalog_object_DEPRECATED(updatedCatalogObject);
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
@@ -1056,18 +1057,16 @@ public class CatalogOpExecutor {
   private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp)
       throws ImpalaException {
     if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + params.toString());
-    DataSource dataSource = catalog_.getDataSource(params.getData_source());
+    DataSource dataSource = catalog_.removeDataSource(params.getData_source());
     if (dataSource == null) {
       if (!params.if_exists) {
         throw new ImpalaRuntimeException("Data source " + params.getData_source() +
             " does not exists.");
       }
-      // The user specified IF EXISTS and the data source didn't exist, just
-      // return the current catalog version.
+      // No data source was removed.
       resp.result.setVersion(catalog_.getCatalogVersion());
       return;
     }
-    catalog_.removeDataSource(params.getData_source());
     TCatalogObject removedObject = new TCatalogObject();
     removedObject.setType(TCatalogObjectType.DATA_SOURCE);
     removedObject.setData_source(dataSource.toThrift());
@@ -1228,13 +1227,12 @@ public class CatalogOpExecutor {
             String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDatabase"), e);
       }
       Db removedDb = catalog_.removeDb(params.getDb());
-      // If no db was removed as part of this operation just return the current catalog
-      // version.
       if (removedDb == null) {
-        removedObject.setCatalog_version(catalog_.getCatalogVersion());
-      } else {
-        removedObject.setCatalog_version(removedDb.getCatalogVersion());
+        // Nothing was removed from the catalogd's cache.
+        resp.result.setVersion(catalog_.getCatalogVersion());
+        return;
       }
+      removedObject.setCatalog_version(removedDb.getCatalogVersion());
     }
     removedObject.setType(TCatalogObjectType.DATABASE);
     removedObject.setDb(new TDatabase());
@@ -1349,32 +1347,33 @@ public class CatalogOpExecutor {
 
       Table table = catalog_.removeTable(params.getTable_name().db_name,
           params.getTable_name().table_name);
-      if (table != null) {
-        resp.result.setVersion(table.getCatalogVersion());
-        if (table instanceof HdfsTable) {
-          HdfsTable hdfsTable = (HdfsTable) table;
-          if (hdfsTable.isMarkedCached()) {
-            try {
-              HdfsCachingUtil.uncacheTbl(table.getMetaStoreTable());
-            } catch (Exception e) {
-              LOG.error("Unable to uncache table: " + table.getFullName(), e);
-            }
+      if (table == null) {
+        // Nothing was removed from the catalogd's cache.
+        resp.result.setVersion(catalog_.getCatalogVersion());
+        return;
+      }
+      resp.result.setVersion(table.getCatalogVersion());
+      if (table instanceof HdfsTable) {
+        HdfsTable hdfsTable = (HdfsTable) table;
+        if (hdfsTable.isMarkedCached()) {
+          try {
+            HdfsCachingUtil.uncacheTbl(table.getMetaStoreTable());
+          } catch (Exception e) {
+            LOG.error("Unable to uncache table: " + table.getFullName(), e);
           }
-          if (table.getNumClusteringCols() > 0) {
-            for (HdfsPartition partition: hdfsTable.getPartitions()) {
-              if (partition.isMarkedCached()) {
-                try {
-                  HdfsCachingUtil.uncachePartition(partition);
-                } catch (Exception e) {
-                  LOG.error("Unable to uncache partition: " +
-                      partition.getPartitionName(), e);
-                }
+        }
+        if (table.getNumClusteringCols() > 0) {
+          for (HdfsPartition partition: hdfsTable.getPartitions()) {
+            if (partition.isMarkedCached()) {
+              try {
+                HdfsCachingUtil.uncachePartition(partition);
+              } catch (Exception e) {
+                LOG.error("Unable to uncache partition: " +
+                    partition.getPartitionName(), e);
               }
             }
           }
         }
-      } else {
-        resp.result.setVersion(catalog_.getCatalogVersion());
       }
     }
     removedObject.setType(TCatalogObjectType.TABLE);
@@ -2076,17 +2075,25 @@ public class CatalogOpExecutor {
     }
     // Rename the table in the Catalog and get the resulting catalog object.
     // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
-    TCatalogObject newTable = TableToTCatalogObject(
-        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift()));
+    Table newTable = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
+    if (newTable == null) {
+      // The rename succeeded in the HMS but failed in the catalog cache. The cache is in
+      // an inconsistent state, but can likely be fixed by running "invalidate metadata".
+      throw new ImpalaRuntimeException(String.format(
+          "Table/view rename succeeded in the Hive Metastore, but failed in Impala's " +
+          "Catalog Server. Running 'invalidate metadata <tbl>' on the old table name " +
+          "'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
+          newTableName.toString()));
+    }
+
+    TCatalogObject addedObject = newTable.toTCatalogObject();
     TCatalogObject removedObject = new TCatalogObject();
     removedObject.setType(TCatalogObjectType.TABLE);
-    removedObject.setTable(new TTable());
-    removedObject.getTable().setTbl_name(tableName.getTbl());
-    removedObject.getTable().setDb_name(tableName.getDb());
-    removedObject.setCatalog_version(newTable.getCatalog_version());
+    removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
+    removedObject.setCatalog_version(addedObject.getCatalog_version());
     response.result.setRemoved_catalog_object_DEPRECATED(removedObject);
-    response.result.setUpdated_catalog_object_DEPRECATED(newTable);
-    response.result.setVersion(newTable.getCatalog_version());
+    response.result.setUpdated_catalog_object_DEPRECATED(addedObject);
+    response.result.setVersion(addedObject.getCatalog_version());
   }
 
   /**
@@ -2655,8 +2662,9 @@ public class CatalogOpExecutor {
       role = catalog_.getSentryProxy().dropRole(requestingUser,
           createDropRoleParams.getRole_name());
       if (role == null) {
-        role = new Role(createDropRoleParams.getRole_name(), Sets.<String>newHashSet());
-        role.setCatalogVersion(catalog_.getCatalogVersion());
+        // Nothing was removed from the catalogd's cache.
+        resp.result.setVersion(catalog_.getCatalogVersion());
+        return;
       }
     } else {
       role = catalog_.getSentryProxy().createRole(requestingUser,
@@ -2839,12 +2847,6 @@ public class CatalogOpExecutor {
     return fsList;
   }
 
-  private static TCatalogObject TableToTCatalogObject(Table table) {
-    if (table != null) return table.toTCatalogObject();
-    return new TCatalogObject(TCatalogObjectType.TABLE,
-        Catalog.INITIAL_CATALOG_VERSION);
-  }
-
    /**
    * Sets the table parameter 'transient_lastDdlTime' to System.currentTimeMillis()/1000
    * in the given msTbl. 'transient_lastDdlTime' is guaranteed to be changed.
@@ -2926,20 +2928,19 @@ public class CatalogOpExecutor {
       }
 
       if (modifiedObjects.first == null) {
-        TCatalogObject thriftTable = TableToTCatalogObject(modifiedObjects.second);
-        if (modifiedObjects.second != null) {
-          // Return the TCatalogObject in the result to indicate this request can be
-          // processed as a direct DDL operation.
-          if (wasRemoved) {
-            resp.getResult().setRemoved_catalog_object_DEPRECATED(thriftTable);
-          } else {
-            resp.getResult().setUpdated_catalog_object_DEPRECATED(thriftTable);
-          }
-        } else {
+        if (modifiedObjects.second == null) {
           // Table does not exist in the meta store and Impala catalog, throw error.
           throw new TableNotFoundException("Table not found: " +
-              req.getTable_name().getDb_name() + "."
-              + req.getTable_name().getTable_name());
+              req.getTable_name().getDb_name() + "." +
+              req.getTable_name().getTable_name());
+        }
+        TCatalogObject thriftTable = modifiedObjects.second.toTCatalogObject();
+        // Return the TCatalogObject in the result to indicate this request can be
+        // processed as a direct DDL operation.
+        if (wasRemoved) {
+          resp.getResult().setRemoved_catalog_object_DEPRECATED(thriftTable);
+        } else {
+          resp.getResult().setUpdated_catalog_object_DEPRECATED(thriftTable);
         }
         resp.getResult().setVersion(thriftTable.getCatalog_version());
       } else {
@@ -2965,8 +2966,7 @@ public class CatalogOpExecutor {
       catalog_.reset();
       resp.result.setVersion(catalog_.getCatalogVersion());
     }
-    resp.getResult().setStatus(
-        new TStatus(TErrorCode.OK, new ArrayList<String>()));
+    resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
     return resp;
   }