You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2018/05/17 16:30:52 UTC

hive git commit: HIVE-18748 - Rename table impacts the ACID behavior as table names are not updated in meta-tables. (Eugene Koifman, reviewed by Sankar Hariappan)

Repository: hive
Updated Branches:
  refs/heads/master b329afa0a -> 76f3ba36a


HIVE-18748 - Rename table impacts the ACID behavior as table names are not updated in meta-tables. (Eugene Koifman, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/76f3ba36
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/76f3ba36
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/76f3ba36

Branch: refs/heads/master
Commit: 76f3ba36abc7ca2d6733a5279fc9702001fb3fdf
Parents: b329afa
Author: Eugene Koifman <ek...@apache.org>
Authored: Thu May 17 09:30:47 2018 -0700
Committer: Eugene Koifman <ek...@apache.org>
Committed: Thu May 17 09:30:47 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/TestTxnConcatenate.java      |  61 +++++++
 .../hive/metastore/AcidEventListener.java       |  51 ++++++
 .../hadoop/hive/metastore/HiveAlterHandler.java |   9 +
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 177 +++++++++++++++++++
 .../hadoop/hive/metastore/txn/TxnStore.java     |   5 +
 5 files changed, 303 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/76f3ba36/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index 2663fec..511198a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.hive.ql;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -174,4 +177,62 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
             "t/base_0000002/000000_0"}};
     checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
   }
+  @Test
+  public void testRenameTable() throws Exception {
+    MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true);
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("drop database if exists mydb2 cascade");
+    runStatementOnDriver("create database mydb1");
+    runStatementOnDriver("create database mydb2");
+    runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc");
+    runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)");
+    //put something in WRITE_SET
+    runStatementOnDriver("update mydb1.T set b = 6 where b = 5");
+    runStatementOnDriver("alter table mydb1.T compact 'minor'");
+
+    runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S");
+
+    String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S";
+    String[][] expected = new String[][] {
+        {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+            "s/delta_0000001_0000001_0000/bucket_00000"},
+        {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6",
+            "s/delta_0000002_0000002_0000/bucket_00000"}};
+    checkResult(expected, testQuery, false, "check data", LOG);
+
+
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'"));
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'"));
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from WRITE_SET where WS_TABLE='t'"));
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'"));
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'"));
+
+    Assert.assertEquals(
+        TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2,
+        TxnDbUtil.countQueryAgent(hiveConf,
+            "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'"));
+    Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'"));
+    Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from WRITE_SET where WS_TABLE='s'"));
+    Assert.assertEquals(2, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'"));
+    Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
+        "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'"));
+
+    //this causes MetaStoreEvenListener.onDropTable()/onCreateTable() to execute and the data
+    //files are just moved under new table.  This can't work since a drop table in Acid removes
+    //the relevant table metadata (like writeid, etc.), so writeIds in file names/ROW_IDs
+    //no longer make sense.  (In fact 'select ...' returns nothing since there is no NEXT_WRITE_ID
+    //entry for the 'new' table and all existing data is 'above HWM'. see HIVE-19569
+    CommandProcessorResponse cpr =
+        runStatementOnDriverNegative("alter table mydb1.S RENAME TO mydb2.bar");
+    Assert.assertTrue(cpr.getErrorMessage() != null && cpr.getErrorMessage()
+        .contains("Changing database name of a transactional table mydb1.s is not supported."));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/76f3ba36/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index f849b1a..5279247 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -19,10 +19,16 @@
 package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.HiveObjectType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
@@ -69,6 +75,51 @@ public class AcidEventListener extends MetaStoreEventListener {
     }
   }
 
+  @Override
+  public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
+    if (!TxnUtils.isTransactionalTable(tableEvent.getNewTable())) {
+      return;
+    }
+    Table oldTable = tableEvent.getOldTable();
+    Table newTable = tableEvent.getNewTable();
+    if(!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) ||
+        !oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) ||
+        !oldTable.getTableName().equalsIgnoreCase(newTable.getTableName())) {
+      txnHandler = getTxnHandler();
+      txnHandler.onRename(
+          oldTable.getCatName(), oldTable.getDbName(), oldTable.getTableName(), null,
+          newTable.getCatName(), newTable.getDbName(), newTable.getTableName(), null);
+    }
+  }
+  @Override
+  public void onAlterPartition(AlterPartitionEvent partitionEvent)  throws MetaException {
+    if (!TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
+      return;
+    }
+    Partition oldPart = partitionEvent.getOldPartition();
+    Partition newPart = partitionEvent.getNewPartition();
+    Table t = partitionEvent.getTable();
+    String oldPartName = Warehouse.makePartName(t.getPartitionKeys(), oldPart.getValues());
+    String newPartName = Warehouse.makePartName(t.getPartitionKeys(), newPart.getValues());
+    if(!oldPartName.equals(newPartName)) {
+      txnHandler = getTxnHandler();
+      txnHandler.onRename(t.getCatName(), t.getDbName(), t.getTableName(), oldPartName,
+          t.getCatName(), t.getDbName(), t.getTableName(), newPartName);
+    }
+  }
+  @Override
+  public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
+    Database oldDb = dbEvent.getOldDatabase();
+    Database newDb = dbEvent.getNewDatabase();
+    if(!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) ||
+        !oldDb.getName().equalsIgnoreCase(newDb.getName())) {
+      txnHandler = getTxnHandler();
+      txnHandler.onRename(
+          oldDb.getCatalogName(), oldDb.getName(), null, null,
+          newDb.getCatalogName(), newDb.getName(), null, null);
+    }
+  }
+
   private TxnStore getTxnHandler() {
     boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ||
         MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);

http://git-wip-us.apache.org/repos/asf/hive/blob/76f3ba36/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 0be0aaa..9ab9e85 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -421,6 +421,15 @@ public class HiveAlterHandler implements AlterHandler {
             new AlterTableEvent(oldt, newt, false, success, handler),
             environmentContext, txnAlterTableEventResponses, msdb);
       } else {
+        if(oldt.getParameters() != null && "true".equalsIgnoreCase(
+            oldt.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))) {
+          /*Why does it split Alter into Drop + Create here?????  This causes onDropTable logic
+           * to wipe out acid related metadata and writeIds from old table don't make sense
+           * in the new table.*/
+          throw new IllegalStateException("Changing database name of a transactional table " +
+              Warehouse.getQualifiedName(oldt) + " is not supported.  Please use create-table-as" +
+              " or create new table manually followed by Insert.");
+        }
         MetaStoreListenerNotifier.notifyEvent(listeners, EventMessage.EventType.DROP_TABLE,
             new DropTableEvent(oldt, true, false, handler),
             environmentContext, txnDropTableEventResponses, msdb);

http://git-wip-us.apache.org/repos/asf/hive/blob/76f3ba36/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c513b4d..21b9865 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -2566,6 +2566,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
    * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc
    * operations.
+   *
+   * HIVE_LOCKS is (presumably) expected to be removed by AcidHouseKeeperServices
+   * WS_SET is (presumably) expected to be removed by AcidWriteSetService
    */
   @Override
   @RetrySemantics.Idempotent
@@ -2760,7 +2763,181 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       cleanupRecords(type, db, table, partitionIterator);
     }
   }
+  /**
+   * Catalog hasn't been added to transactional tables yet, so it's passed in but not used.
+   */
+  @Override
+  public void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName,
+      String newCatName, String newDbName, String newTabName, String newPartName)
+      throws MetaException {
+    String callSig = "onRename(" +
+        oldCatName + "," + oldDbName + "," + oldTabName + "," + oldPartName + "," +
+        newCatName + "," + newDbName + "," + newTabName + "," + newPartName + ")";
+
+    if(newPartName != null) {
+      assert oldPartName != null && oldTabName != null && oldDbName != null && oldCatName != null :
+      callSig;
+    }
+    if(newTabName != null) {
+      assert oldTabName != null && oldDbName != null && oldCatName != null : callSig;
+    }
+    if(newDbName != null) {
+      assert oldDbName != null && oldCatName != null : callSig;
+    }
+
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        List<String> queries = new ArrayList<>();
+
+        String update = "update TXN_COMPONENTS set ";
+        String where = " where ";
+        if(oldPartName != null) {
+          update += "TC_PARTITION = " + quoteString(newPartName) + ", ";
+          where += "TC_PARTITION = " + quoteString(oldPartName) + " AND ";
+        }
+        if(oldTabName != null) {
+          update += "TC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "TC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "TC_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "TC_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        update = "update COMPLETED_TXN_COMPONENTS set ";
+        where = " where ";
+        if(oldPartName != null) {
+          update += "CTC_PARTITION = " + quoteString(newPartName) + ", ";
+          where += "CTC_PARTITION = " + quoteString(oldPartName) + " AND ";
+        }
+        if(oldTabName != null) {
+          update += "CTC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "CTC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "CTC_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "CTC_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        update = "update HIVE_LOCKS set ";
+        where = " where ";
+        if(oldPartName != null) {
+          update += "HL_PARTITION = " + quoteString(newPartName) + ", ";
+          where += "HL_PARTITION = " + quoteString(oldPartName) + " AND ";
+        }
+        if(oldTabName != null) {
+          update += "HL_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "HL_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "HL_DB = " + quoteString(normalizeCase(newDbName));
+          where += "HL_DB = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        update = "update COMPACTION_QUEUE set ";
+        where = " where ";
+        if(oldPartName != null) {
+          update += "CQ_PARTITION = " + quoteString(newPartName) + ", ";
+          where += "CQ_PARTITION = " + quoteString(oldPartName) + " AND ";
+        }
+        if(oldTabName != null) {
+          update += "CQ_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "CQ_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "CQ_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "CQ_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
 
+        update = "update COMPLETED_COMPACTIONS set ";
+        where = " where ";
+        if(oldPartName != null) {
+          update += "CC_PARTITION = " + quoteString(newPartName) + ", ";
+          where += "CC_PARTITION = " + quoteString(oldPartName) + " AND ";
+        }
+        if(oldTabName != null) {
+          update += "CC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "CC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "CC_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "CC_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        update = "update WRITE_SET set ";
+        where = " where ";
+        if(oldPartName != null) {
+          update += "WS_PARTITION = " + quoteString(newPartName) + ", ";
+          where += "WS_PARTITION = " + quoteString(oldPartName) + " AND ";
+        }
+        if(oldTabName != null) {
+          update += "WS_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "WS_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "WS_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "WS_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        update = "update TXN_TO_WRITE_ID set ";
+        where = " where ";
+        if(oldTabName != null) {
+          update += "T2W_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "T2W_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "T2W_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "T2W_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        update = "update NEXT_WRITE_ID set ";
+        where = " where ";
+        if(oldTabName != null) {
+          update += "NWI_TABLE = " + quoteString(normalizeCase(newTabName)) + ", ";
+          where += "NWI_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND ";
+        }
+        if(oldDbName != null) {
+          update += "NWI_DATABASE = " + quoteString(normalizeCase(newDbName));
+          where += "NWI_DATABASE = " + quoteString(normalizeCase(oldDbName));
+        }
+        queries.add(update + where);
+
+        for (String query : queries) {
+          LOG.debug("Going to execute update <" + query + ">");
+          stmt.executeUpdate(query);
+        }
+
+        LOG.debug("Going to commit: " + callSig);
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback: " + callSig);
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, callSig);
+        if (e.getMessage().contains("does not exist")) {
+          LOG.warn("Cannot perform " + callSig + " since metastore table does not exist");
+        } else {
+          throw new MetaException("Unable to " + callSig + ":" + StringUtils.stringifyException(e));
+        }
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      onRename(oldCatName, oldDbName, oldTabName, oldPartName,
+          newCatName, newDbName, newTabName, newPartName);
+    }
+  }
   /**
    * For testing only, do not use.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/76f3ba36/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index b8e398f..4695f0d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -266,6 +266,11 @@ public interface TxnStore extends Configurable {
   void cleanupRecords(HiveObjectType type, Database db, Table table,
                              Iterator<Partition> partitionIterator) throws MetaException;
 
+  @RetrySemantics.Idempotent
+  void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName,
+      String newCatName, String newDbName, String newTabName, String newPartName)
+      throws MetaException;
+
   /**
    * Timeout transactions and/or locks.  This should only be called by the compactor.
    */