You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/19 00:42:19 UTC

[impala] 01/02: IMPALA-10512: ALTER TABLE ADD PARTITION should bump the write id for ACID tables

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 61623438428deebdeb73225f664242d5a4aeba46
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Feb 18 13:51:32 2021 +0100

    IMPALA-10512: ALTER TABLE ADD PARTITION should bump the write id for ACID tables
    
    ALTER TABLE ADD PARTITION should bump the write id for ACID tables.
    Both for INSERT-only and full ACID tables.
    
    For transational tables we are adding partitions in an ACID
    transaction in the following sequence:
    
    1. open transaction
    2. allocate write id for table
    3. add partitions to HMS table
    4. commit transaction
    
    However, please note that table metadata modifications are
    independent of ACID transactions. I.e. if add partitions succeed,
    but we cannot commit the transaction, then we the newly added
    partitions won't get removed.
    
    So why are we opening a txn then? We are doing it in order to bump
    the write id in a best-effort way. This aids table metadata caching,
    so by looking at the table write id we can determine if the cached
    table metadata is up-to-date.
    
    Testing:
     * added e2e test
    
    Change-Id: Iad247008b7c206db00516326c1447bd00a9b34bd
    Reviewed-on: http://gerrit.cloudera.org:8080/17081
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/Catalog.java    |  7 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  5 ++
 .../org/apache/impala/catalog/ImpaladCatalog.java  |  7 ++
 .../org/apache/impala/catalog/Transaction.java     | 19 +++++-
 .../apache/impala/service/CatalogOpExecutor.java   | 77 ++++++++++++++++------
 .../queries/QueryTest/full-acid-rowid.test         | 32 ++++-----
 tests/query_test/test_acid.py                      | 27 ++++++++
 7 files changed, 133 insertions(+), 41 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index d232f0a..c662c78 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -125,6 +125,11 @@ public abstract class Catalog implements AutoCloseable {
   }
 
   /**
+   * Returns the Hive ACID user id used by this catalog.
+   */
+  public abstract String getAcidUserId();
+
+  /**
    * Adds a new database to the catalog, replacing any existing database with the same
    * name.
    */
@@ -684,7 +689,7 @@ public abstract class Catalog implements AutoCloseable {
    */
   public Transaction openTransaction(IMetaStoreClient hmsClient, HeartbeatContext ctx)
       throws TransactionException {
-    return new Transaction(hmsClient, transactionKeepalive_, "Impala Catalog", ctx);
+    return new Transaction(hmsClient, transactionKeepalive_, getAcidUserId(), ctx);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 704bac4..0f7d8f1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3333,6 +3333,11 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  @Override
+  public String getAcidUserId() {
+    return String.format("CatalogD %s", getCatalogServiceId());
+  }
+
   /**
    * Gets the id for this catalog service
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index d1452a1..edbfee2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -51,6 +51,7 @@ import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.TByteBuffer;
+import org.apache.impala.util.TUniqueIdUtil;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.slf4j.Logger;
@@ -655,5 +656,11 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
   }
 
   @Override
+  public String getAcidUserId() {
+    return String.format("Impala Catalog %s",
+        TUniqueIdUtil.PrintId(getCatalogServiceId()));
+  }
+
+  @Override
   public TUniqueId getCatalogServiceId() { return catalogServiceId_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Transaction.java b/fe/src/main/java/org/apache/impala/catalog/Transaction.java
index e773f34..f4af599 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Transaction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Transaction.java
@@ -46,24 +46,37 @@ public class Transaction implements AutoCloseable {
     hmsClient_ = hmsClient;
     keepalive_ = keepalive;
     transactionId_ = MetastoreShim.openTransaction(hmsClient_);
-    LOG.info("Opened transaction: " + String.valueOf(transactionId_));
+    LOG.info(String.format("Opened transaction %d by user '%s' ", transactionId_, user));
     keepalive_.addTransaction(transactionId_, ctx);
   }
 
+  /**
+   * Constructor for short-running transactions that we don't want to heartbeat.
+   */
+  public Transaction(IMetaStoreClient hmsClient, String user, String context)
+      throws TransactionException {
+    Preconditions.checkNotNull(hmsClient);
+    hmsClient_ = hmsClient;
+    transactionId_ = MetastoreShim.openTransaction(hmsClient_);
+    LOG.info(String.format("Opened transaction %d by user '%s' in context: %s",
+        transactionId_, user, context));
+  }
+
   public long getId() { return transactionId_; }
 
   public void commit() throws TransactionException {
     Preconditions.checkState(transactionId_ > 0);
-    keepalive_.deleteTransaction(transactionId_);
+    if (keepalive_ != null) keepalive_.deleteTransaction(transactionId_);
     MetastoreShim.commitTransaction(hmsClient_, transactionId_);
     transactionId_ = -1;
   }
 
   @Override
   public void close() {
+    // Return early if transaction was committed successfully.
     if (transactionId_ <= 0) return;
 
-    keepalive_.deleteTransaction(transactionId_);
+    if (keepalive_ != null) keepalive_.deleteTransaction(transactionId_);
     try {
       MetastoreShim.abortTransaction(hmsClient_, transactionId_);
     } catch (TransactionException e) {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 0de5c5d..2a89b77 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -79,6 +79,7 @@ import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogObject;
 import org.apache.impala.catalog.CatalogServiceCatalog;
@@ -1036,20 +1037,18 @@ public class CatalogOpExecutor {
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
-  private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
-      throws CatalogException {
+  private Table addHdfsPartitions(MetaStoreClient msClient, Table tbl,
+      List<Partition> partitions) throws CatalogException {
     Preconditions.checkNotNull(tbl);
     Preconditions.checkNotNull(partitions);
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
     }
     HdfsTable hdfsTable = (HdfsTable) tbl;
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(
-          msClient.getHiveClient(), partitions);
-      for (HdfsPartition hdfsPartition : hdfsPartitions) {
-        catalog_.addPartition(hdfsPartition);
-      }
+    List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(
+        msClient.getHiveClient(), partitions);
+    for (HdfsPartition hdfsPartition : hdfsPartitions) {
+      catalog_.addPartition(hdfsPartition);
     }
     return hdfsTable;
   }
@@ -3143,18 +3142,9 @@ public class CatalogOpExecutor {
     if (allHmsPartitionsToAdd.isEmpty()) return null;
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      List<Partition> addedHmsPartitions = Lists.newArrayList();
+      List<Partition> addedHmsPartitions = addHmsPartitionsInTransaction(msClient,
+          tbl, allHmsPartitionsToAdd, ifNotExists);
 
-      for (List<Partition> hmsSublist :
-          Lists.partition(allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) {
-        try {
-          addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist,
-              ifNotExists, true));
-        } catch (TException e) {
-          throw new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
-        }
-      }
       // Handle HDFS cache. This is done in a separate round bacause we have to apply
       // caching only to newly added partitions.
       alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
@@ -3169,12 +3159,57 @@ public class CatalogOpExecutor {
         addedHmsPartitions.addAll(
             getPartitionsFromHms(msTbl, msClient, tableName, difference));
       }
-      addHdfsPartitions(tbl, addedHmsPartitions);
+      addHdfsPartitions(msClient, tbl, addedHmsPartitions);
     }
     return tbl;
   }
 
   /**
+   * Adds partitions in 'allHmsPartitionsToAdd' in batches via 'msClient'.
+   * Returns the created partitions.
+   */
+  List<Partition> addHmsPartitions(MetaStoreClient msClient,
+      List<Partition> allHmsPartitionsToAdd, boolean ifNotExists)
+      throws ImpalaRuntimeException {
+    List<Partition> addedHmsPartitions = Lists.newArrayList();
+    for (List<Partition> hmsSublist : Lists.partition(
+        allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) {
+      try {
+        addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist,
+            ifNotExists, true));
+      } catch (TException e) {
+        throw new ImpalaRuntimeException(
+            String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
+      }
+    }
+    return addedHmsPartitions;
+  }
+
+  /**
+   * Invokes addHmsPartitions() in transaction for transactional tables. For
+   * non-transactional tables it just simply invokes addHmsPartitions().
+   * Please note that once addHmsPartitions() succeeded, then even if the transaction
+   * fails, the HMS table modification won't be reverted.
+   * Returns the list of the newly added partitions.
+   */
+  List<Partition> addHmsPartitionsInTransaction(MetaStoreClient msClient, Table tbl,
+      List<Partition> partitions, boolean ifNotExists) throws ImpalaException {
+    if (!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) {
+      return addHmsPartitions(msClient, partitions, ifNotExists);
+    }
+    try (Transaction txn = new Transaction(
+        msClient.getHiveClient(),
+        catalog_.getAcidUserId(),
+        String.format("ADD PARTITION for %s", tbl.getFullName()))) {
+      MetastoreShim.allocateTableWriteId(msClient.getHiveClient(), txn.getId(),
+          tbl.getDb().getName(), tbl.getName());
+      List<Partition> ret = addHmsPartitions(msClient, partitions, ifNotExists);
+      txn.commit();
+      return ret;
+    }
+  }
+
+  /**
    * Returns the list of Partition objects from 'aList' that cannot be found in 'bList'.
    * Partition objects are distinguished by partition values only.
    */
@@ -3901,7 +3936,7 @@ public class CatalogOpExecutor {
         // ifNotExists and needResults are true.
         List<Partition> hmsAddedPartitions =
             msClient.getHiveClient().add_partitions(hmsSublist, true, true);
-        addHdfsPartitions(tbl, hmsAddedPartitions);
+        addHdfsPartitions(msClient, tbl, hmsAddedPartitions);
         // Handle HDFS cache.
         if (cachePoolName != null) {
           for (Partition partition: hmsAddedPartitions) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
index a0654fc..025591b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
@@ -20,14 +20,14 @@ select row__id.*, * from functional_orc_def.alltypestiny;
 ---- LABELS
 OPERATION, ORIGINALTRANSACTION, BUCKET, ROWID, CURRENTTRANSACTION, ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
 ---- RESULTS
-0,1,536870912,0,1,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
-0,1,536870912,1,1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
-0,1,536870912,0,1,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
-0,1,536870912,1,1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
-0,1,536870912,0,1,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
-0,1,536870912,1,1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
-0,1,536870912,0,1,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
-0,1,536870912,1,1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+0,5,536870912,0,5,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+0,5,536870912,1,5,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,5,536870912,0,5,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+0,5,536870912,1,5,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+0,5,536870912,0,5,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+0,5,536870912,1,5,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+0,5,536870912,0,5,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+0,5,536870912,1,5,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
 ---- TYPES
 INT, BIGINT, INT, BIGINT, BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
 ====
@@ -37,14 +37,14 @@ from functional_orc_def.alltypestiny;
 ---- LABELS
 ROW__ID.OPERATION, ROW__ID.ROWID, ROW__ID.ORIGINALTRANSACTION, ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
 ---- RESULTS
-0,0,1,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
-0,1,1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
-0,0,1,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
-0,1,1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
-0,0,1,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
-0,1,1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
-0,0,1,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
-0,1,1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+0,0,5,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+0,1,5,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,0,5,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+0,1,5,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+0,0,5,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+0,1,5,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+0,0,5,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+0,1,5,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
 ---- TYPES
 INT, BIGINT, BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
 ====
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index dee59ee..b8ea970 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -27,6 +27,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import (SkipIf, SkipIfHive2, SkipIfCatalogV2, SkipIfS3, SkipIfABFS,
                                SkipIfADLS, SkipIfIsilon, SkipIfGCS, SkipIfLocal)
 from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.util.acid_txn import AcidTxn
 
 
 class TestAcid(ImpalaTestSuite):
@@ -344,3 +345,29 @@ class TestAcid(ImpalaTestSuite):
     self.execute_query("refresh {}".format(fq_table_name))
     result = self.execute_query("select count(*) from {0}".format(fq_table_name))
     assert "3" in result.data
+
+  def test_add_partition_write_id(self, vector, unique_database):
+    """Test that ALTER TABLE ADD PARTITION increases the write id of the table."""
+    # Test INSERT-only table
+    io_tbl_name = "insert_only_table"
+    self.client.execute("""CREATE TABLE {0}.{1} (i int) PARTITIONED BY (p int)
+        TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only')
+        """.format(unique_database, io_tbl_name))
+    self._check_add_partition_write_id_change(unique_database, io_tbl_name)
+
+    # Test Full ACID table
+    full_acid_name = "full_acid_table"
+    self.client.execute("""CREATE TABLE {0}.{1} (i int) PARTITIONED BY (p int)
+        STORED AS ORC TBLPROPERTIES('transactional'='true')
+        """.format(unique_database, full_acid_name))
+    self._check_add_partition_write_id_change(unique_database, full_acid_name)
+
+  def _check_add_partition_write_id_change(self, db_name, tbl_name):
+    acid_util = AcidTxn(self.hive_client)
+    valid_write_ids = acid_util.get_valid_write_ids(db_name, tbl_name)
+    orig_write_id = valid_write_ids.tblValidWriteIds[0].writeIdHighWaterMark
+    self.client.execute("""alter table {0}.{1} add partition (p=1)
+        """.format(db_name, tbl_name))
+    valid_write_ids = acid_util.get_valid_write_ids(db_name, tbl_name)
+    new_write_id = valid_write_ids.tblValidWriteIds[0].writeIdHighWaterMark
+    assert new_write_id > orig_write_id