You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/07/15 18:00:51 UTC

[impala] 02/03: IMPALA-9946: Use table id when comparing transactional state

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0b5a9889ee39a7df816efc6476aae2624f82f377
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Jul 1 16:16:49 2020 -0700

    IMPALA-9946: Use table id when comparing transactional state
    
    This change adds support for catalog clients to optionally
    provide a table id when fetching the table metadata
    using the GetPartialCatalogObject API. This table id
    is used to compare the catalog table is same as the one
    which is requested by client. If the table id matches, we compare the
    ValidWriteIdList to provide a consistent view of the metadata to the
    clients.
    
    Testing:
    Added a new test which drops and recreates the table from hive to
    introduce the false positive in the existing ValidWriteIdList
    comparison logic. After the patch, the test succeeds.
    
    Change-Id: I58c5bd58c4eb5663647c01ecd738b661e4e4cd74
    Reviewed-on: http://gerrit.cloudera.org:8080/16170
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogService.thrift                |  9 +++
 .../impala/catalog/CatalogServiceCatalog.java      | 25 ++++--
 .../java/org/apache/impala/util/AcidUtils.java     | 16 +++-
 .../catalog/PartialCatalogInfoWriteIdTest.java     | 92 ++++++++++++++++++++++
 shaded-deps/pom.xml                                |  1 +
 5 files changed, 135 insertions(+), 8 deletions(-)

diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index d14e0ff..53c7bdd 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -333,6 +333,11 @@ struct TTableInfoSelector {
   // If this is for a ACID table and this is set, this table info returned
   // will be consistent the provided valid_write_ids
   9: optional CatalogObjects.TValidWriteIdList valid_write_ids
+
+  // If the table id is provided the catalog service compares this table id
+  // with the HMS table which it has and triggers a reload in case it doesn't match.
+  // this field is only used when valid_write_ids is set, otherwise it is ignored
+  10: optional i64 table_id = -1
 }
 
 // Returned information about a particular partition.
@@ -510,6 +515,10 @@ struct TGetPartitionStatsRequest {
   // if the table is transactional then this field represents the client's view
   // of the table snapshot view in terms of ValidWriteIdList.
   3: optional CatalogObjects.TValidWriteIdList valid_write_ids
+  // If the table id is provided the catalog service compares this table id
+  // with the HMS table which it has and triggers a reload in case it doesn't match.
+  // this field is only used when valid_write_ids is set, otherwise it is ignored
+  4: optional i64 table_id = -1
 }
 
 // Response for requesting partition statistics. All partition statistics
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 9668c15..566ccef 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -207,6 +207,8 @@ public class CatalogServiceCatalog extends Catalog {
   private static final long LOCK_RETRY_TIMEOUT_MS = 7200000;
   // Time to sleep before retrying to acquire a table lock
   private static final int LOCK_RETRY_DELAY_MS = 10;
+  // default value of table id in the GetPartialCatalogObjectRequest
+  public static final long TABLE_ID_UNAVAILABLE = -1;
 
   private final TUniqueId catalogServiceId_;
 
@@ -603,8 +605,9 @@ public class CatalogServiceCatalog extends Catalog {
         new TableName(request.table_name.db_name, request.table_name.table_name)
           .toString(), request.valid_write_ids);
     }
+    long tableId = request.getTable_id();
     Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
-        "needed to fetch partition stats", writeIdList);
+        "needed to fetch partition stats", writeIdList, tableId);
 
     // Table could be null if it does not exist anymore.
     if (table == null) {
@@ -1849,6 +1852,13 @@ public class CatalogServiceCatalog extends Catalog {
     return table;
   }
 
+
+  public Table getOrLoadTable(String dbName, String tblName, String reason,
+      ValidWriteIdList validWriteIdList) throws CatalogException {
+    return getOrLoadTable(dbName, tblName, reason, validWriteIdList,
+        TABLE_ID_UNAVAILABLE);
+  }
+
   /**
    * Gets the table with the given name, loading it if needed (if the existing catalog
    * object is not yet loaded). Returns the matching Table or null if no table with this
@@ -1859,7 +1869,7 @@ public class CatalogServiceCatalog extends Catalog {
    * (not yet loaded table) will be returned.
    */
   public Table getOrLoadTable(String dbName, String tblName, String reason,
-      ValidWriteIdList validWriteIdList) throws CatalogException {
+      ValidWriteIdList validWriteIdList, long tableId) throws CatalogException {
     TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
     TableLoadingMgr.LoadRequest loadReq;
 
@@ -1874,9 +1884,12 @@ public class CatalogServiceCatalog extends Catalog {
       if (tbl.isLoaded() && validWriteIdList == null) return tbl;
       // if a validWriteIdList is provided, we see if the cached table can provided a
       // consistent view of the given validWriteIdList. If yes, we can return the table
-      // otherwise we reload the table.
+      // otherwise we reload the table. It is possible that the cached table is stale
+      // even if the ValidWriteIdList matches (eg. out-of-band drop and recreate of
+      // table) Hence we should make sure that we are comparing
+      // the ValidWriteIdList only when the table id matches.
       if (tbl instanceof HdfsTable
-          && AcidUtils.compare((HdfsTable) tbl, validWriteIdList) >= 0) {
+          && AcidUtils.compare((HdfsTable) tbl, validWriteIdList, tableId) >= 0) {
         return tbl;
       }
       previousCatalogVersion = tbl.getCatalogVersion();
@@ -3034,6 +3047,7 @@ public class CatalogServiceCatalog extends Catalog {
       Table table;
       ValidWriteIdList writeIdList = null;
       try {
+        long tableId = TABLE_ID_UNAVAILABLE;
         if (req.table_info_selector.valid_write_ids != null) {
           Preconditions.checkState(objectDesc.type.equals(TABLE));
           String dbName = objectDesc.getTable().db_name == null ? Catalog.DEFAULT_DB
@@ -3041,10 +3055,11 @@ public class CatalogServiceCatalog extends Catalog {
           String tblName = objectDesc.getTable().tbl_name;
           writeIdList = MetastoreShim.getValidWriteIdListFromThrift(
               dbName + "." + tblName, req.table_info_selector.valid_write_ids);
+          tableId = req.table_info_selector.getTable_id();
         }
         table = getOrLoadTable(
             objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
-            "needed by coordinator", writeIdList);
+            "needed by coordinator", writeIdList, tableId);
       } catch (DatabaseNotFoundException e) {
         return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
       }
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 94cb9e2..28bf2f1 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -642,13 +643,21 @@ public class AcidUtils {
    * it returns 0 and if the table ValidWriteIdList is behind the provided
    * validWriteIdList this return -1. This information useful to determine if the
    * cached table can be used to construct a consistent snapshot corresponding to the
-   * given validWriteIdList.
+   * given validWriteIdList. The ValidWriteIdList is compared only if the table id
+   * matches with the given tableId.
    */
-  public static int compare(HdfsTable tbl, ValidWriteIdList validWriteIdList) {
+  public static int compare(HdfsTable tbl, ValidWriteIdList validWriteIdList,
+      long tableId) {
     Preconditions.checkState(tbl != null && tbl.getMetaStoreTable() != null);
     // if tbl is not a transactional, there is nothing to compare against and we return 0
     if (!isTransactionalTable(tbl.getMetaStoreTable().getParameters())) return 0;
     Preconditions.checkNotNull(tbl.getValidWriteIds());
+    // if the provided table id does not match with what CatalogService has we return
+    // -1 indicating that cached table is stale.
+    if (tableId != CatalogServiceCatalog.TABLE_ID_UNAVAILABLE
+        && tbl.getMetaStoreTable().getId() != tableId) {
+      return -1;
+    }
     return compare(tbl.getValidWriteIds(), validWriteIdList);
   }
 
@@ -664,7 +673,8 @@ public class AcidUtils {
    * 1, if a is more recent
    * -1, if b is more recent
    ***/
-  private static int compare(ValidWriteIdList a, ValidWriteIdList b) {
+  @VisibleForTesting
+  public static int compare(ValidWriteIdList a, ValidWriteIdList b) {
     Preconditions.checkState(a.getTableName().equalsIgnoreCase(b.getTableName()));
     // The algorithm assumes invalidWriteIds are sorted and values are less or equal than
     // hwm, here is how the algorithm works:
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java
index fdb8855..d1c162f 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java
@@ -17,7 +17,11 @@
 
 package org.apache.impala.catalog;
 
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.sql.SQLException;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
@@ -32,11 +36,14 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.AcidUtils;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -475,12 +482,82 @@ public class PartialCatalogInfoWriteIdTest {
     Assert.assertEquals(numHitsAfter, numHits1 );
   }
 
+  /**
+   * This test make sure that the table returned is consistent with given writeId list
+   * even if the table was dropped and recreated from outside.
+   * @throws Exception
+   */
+  @Test
+  public void testFetchAfterDropAndRecreate() throws Exception {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    // row 2, first row is in the setup method
+    executeImpalaSql("insert into " + getTestTblName() + " values (2)");
+    Table tbl = catalog_.getOrLoadTable(testDbName, testTblName, "test", null);
+    Assert.assertFalse("Table must be loaded",
+        tbl instanceof IncompleteTable);
+    ValidWriteIdList olderWriteIdList = getValidWriteIdList(testDbName,
+        testTblName);
+    Assert.assertEquals(olderWriteIdList.toString(), tbl.getValidWriteIds().toString());
+    TGetPartialCatalogObjectRequest request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testTblName)
+        .writeId(olderWriteIdList)
+        .wantFiles()
+        .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(request);
+    Assert.assertEquals(1, response.getTable_info().getPartitionsSize());
+    List<THdfsFileDesc> oldFds = response.getTable_info().getPartitions()
+        .get(0).file_descriptors;
+    Assert.assertEquals(2, oldFds.size());
+    // now recreate the table from hive so that Impala is not aware of it
+    executeHiveSql("drop table " + getTestTblName());
+    executeHiveSql("create table " + getTestTblName() + " like "
+        + "functional.insert_only_transactional_table stored as parquet");
+    // we do 2 more inserts into the table so that the high-watermark is same
+    // as olderWriteIdList.
+    executeHiveSql("insert into " + getTestTblName() + " values (1)");
+    executeHiveSql("insert into " + getTestTblName() + " values (2)");
+    ValidWriteIdList newerWriteIdList = getValidWriteIdList(testDbName, testTblName);
+    // the validWriteIdList itself is compatible
+    Assert.assertTrue(AcidUtils.compare(newerWriteIdList, olderWriteIdList) == 0);
+    // now a client with the newerValidWriteIdList must re-trigger a load
+    request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testTblName)
+        .writeId(newerWriteIdList)
+        .tableId(getTableId(testDbName, testTblName))
+        .wantFiles()
+        .build();
+    response = sendRequest(request);
+    Assert.assertEquals(1, response.getTable_info().getPartitionsSize());
+    List<THdfsFileDesc> newFds = response.getTable_info().getPartitions()
+        .get(0).file_descriptors;
+    Assert.assertEquals(2, newFds.size());
+    for (int i=0; i<newFds.size(); i++) {
+      // we expect that table was reloaded and hence the file descriptors should be
+      // different
+      Assert.assertNotEquals("Found the new file descriptor same as old one",
+          newFds.get(i), oldFds.get(i));
+    }
+  }
+
   private void executeHiveSql(String query) throws Exception {
     try (HiveJdbcClient hiveClient = hiveClientPool_.getClient()) {
       hiveClient.executeSql(query);
     }
   }
 
+  private void executeImpalaSql(String query) throws Exception {
+    ImpalaJdbcClient client = ImpalaJdbcClient
+        .createClientUsingHiveJdbcDriver();
+    client.connect();
+    try {
+      client.execStatement(query);
+    } finally {
+      client.close();
+    }
+  }
+
   /**
    * Simple Request builder class. Assumes all the metadata at higher granularity is
    * required if a specific level is requested. For examples, if files are requested,
@@ -493,6 +570,7 @@ public class PartialCatalogInfoWriteIdTest {
     boolean wantPartitionNames;
     String tblName, dbName;
     ValidWriteIdList writeIdList;
+    long tableId = -1;
 
     RequestBuilder db(String db) {
       this.dbName = db;
@@ -509,6 +587,11 @@ public class PartialCatalogInfoWriteIdTest {
       return this;
     }
 
+    RequestBuilder tableId(long id) {
+      this.tableId = id;
+      return this;
+    }
+
     RequestBuilder wantFiles() {
       wantFileMetadata = true;
       wantPartitionMeta = true;
@@ -536,6 +619,7 @@ public class PartialCatalogInfoWriteIdTest {
       req.table_info_selector = new TTableInfoSelector();
       req.table_info_selector.valid_write_ids =
         MetastoreShim.convertToTValidWriteIdList(writeIdList);
+      req.table_info_selector.table_id = tableId;
       req.table_info_selector.want_hms_table = true;
       if (wantPartitionNames) {
         req.table_info_selector.want_partition_names = true;
@@ -550,6 +634,14 @@ public class PartialCatalogInfoWriteIdTest {
     }
   }
 
+  /**
+   * Gets the table id from the HMS.
+   */
+  private long getTableId(String db, String tbl) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getTable(db, tbl).getId();
+    }
+  }
 
   private ValidWriteIdList getValidWriteIdList(String db, String tbl) throws TException {
     try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
diff --git a/shaded-deps/pom.xml b/shaded-deps/pom.xml
index 261e30d..ff6fa25 100644
--- a/shaded-deps/pom.xml
+++ b/shaded-deps/pom.xml
@@ -98,6 +98,7 @@ the same dependencies
                 <include>org/apache/hadoop/hive/ql/stats/estimator/**</include>
                 <include>org/apache/hive/service/rpc/thrift/**</include>
                 <include>org/apache/hive/common/HiveVersionAnnotation.class</include>
+                <include>org/apache/hadoop/hive/ql/ErrorMsg.class</include>
                 <include>org/apache/orc/**</include>
                 <include>com/google/**</include>
               </includes>