You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/08/10 16:56:44 UTC

hive git commit: HIVE-20264: Bootstrap repl dump with concurrent write and drop of ACID table makes target inconsistent (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/master 8cd9d3f7f -> 81dff07cb


HIVE-20264: Bootstrap repl dump with concurrent write and drop of ACID table makes target inconsistent (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/81dff07c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/81dff07c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/81dff07c

Branch: refs/heads/master
Commit: 81dff07cba2106c5d19f57701c15b62a5b773e63
Parents: 8cd9d3f
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri Aug 10 22:26:24 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri Aug 10 22:26:24 2018 +0530

----------------------------------------------------------------------
 .../TestReplicationScenariosAcidTables.java     | 79 ++++++++++++++++++++
 .../hive/metastore/txn/TestTxnHandler.java      |  5 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 49 +++++++-----
 3 files changed, 113 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 3040f6c..f074428 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -364,6 +364,85 @@ public class TestReplicationScenariosAcidTables {
   }
 
   @Test
+  public void testAcidTablesBootstrapWithConcurrentDropTable() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)");
+
+    // Perform concurrent write + drop on the acid table t1 when bootstrap dump in progress. Bootstrap
+    // won't dump the table but the subsequent incremental repl with new table with same name should be seen.
+    BehaviourInjection<CallerArguments, Boolean> callerInjectedBehavior
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        if (injectionPathCalled) {
+          nonInjectedPathCalled = true;
+        } else {
+          // Insert another row to t1 and drop the table from another txn when bootstrap dump in progress.
+          injectionPathCalled = true;
+          Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+              LOG.info("Entered new thread");
+              IDriver driver = DriverFactory.newDriver(primaryConf);
+              SessionState.start(new CliSessionState(primaryConf));
+              CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)");
+              boolean success = (ret.getException() == null);
+              assertTrue(success);
+              ret = driver.run("drop table " + primaryDbName + ".t1");
+              success = (ret.getException() == null);
+              assertTrue(success);
+              LOG.info("Exit new thread success - {}", success, ret.getException());
+            }
+          });
+          t.start();
+          LOG.info("Created new thread {}", t.getName());
+          try {
+            t.join();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return true;
+      }
+    };
+
+    InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior);
+    WarehouseInstance.Tuple bootstrapDump = null;
+    try {
+      bootstrapDump = primary.dump(primaryDbName, null);
+      callerInjectedBehavior.assertInjectionsPerformed(true, true);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+    }
+
+    // Bootstrap dump has taken latest list of tables and hence won't see table t1 as it is dropped.
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(bootstrapDump.lastReplicationId)
+            .run("show tables")
+            .verifyResult(null);
+
+    // Create another ACID table with same name and insert a row. It should be properly replicated.
+    WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(100)")
+            .dump(primaryDbName, bootstrapDump.lastReplicationId);
+
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("select id from t1 order by id")
+            .verifyResult("100");
+  }
+
+  @Test
   public void testOpenTxnEvent() throws Throwable {
     String tableName = testName.getMethodName();
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index c0725ad..be37b2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1667,12 +1667,15 @@ public class TestTxnHandler {
     allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl2");
     allocMsg.setReplPolicy("destdb.*");
     allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId);
+
+    // This is an idempotent case when repl flow forcefully allocate write id if it doesn't match
+    // the next write id.
     try {
       txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
     } catch (IllegalStateException e) {
       failed = true;
     }
-    assertTrue(failed);
+    assertFalse(failed);
 
     replAbortTxnForTest(srcTxnIdList, "destdb.*");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index f5e4905..8edc387 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1404,7 +1404,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           if (srcTxnIds.size() != txnIds.size()) {
             // Idempotent case where txn was already closed but gets allocate write id event.
             // So, just ignore it and return empty list.
-            LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
+            LOG.info("Idempotent case: Target txn id is missing for source txn id : " + srcTxnIds.toString() +
                     " and repl policy " + rqst.getReplPolicy());
             return new AllocateTableWriteIdsResponse(txnToWriteIds);
           }
@@ -1422,10 +1422,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           throw new RuntimeException("This should never happen for txnIds: " + txnIds);
         }
 
-        long writeId;
-        String s;
-        long allocatedTxnsCount = 0;
-        long txnId;
         List<String> queries = new ArrayList<>();
         StringBuilder prefix = new StringBuilder();
         StringBuilder suffix = new StringBuilder();
@@ -1440,6 +1436,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         suffix.append("");
         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
                 txnIds, "t2w_txnid", false, false);
+
+        long allocatedTxnsCount = 0;
+        long txnId;
+        long writeId = 0;
         for (String query : queries) {
           LOG.debug("Going to execute query <" + query + ">");
           rs = stmt.executeQuery(query);
@@ -1462,12 +1462,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           return new AllocateTableWriteIdsResponse(txnToWriteIds);
         }
 
+        long srcWriteId = 0;
+        if (rqst.isSetReplPolicy()) {
+          // In replication flow, we always need to allocate write ID equal to that of source.
+          assert(srcTxnToWriteIds != null);
+          srcWriteId = srcTxnToWriteIds.get(0).getWriteId();
+        }
+
         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
 
         // There are some txns in the list which does not have write id allocated and hence go ahead and do it.
         // Get the next write id for the given table and update it with new next write id.
         // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
-        s = sqlGenerator.addForUpdateClause(
+        String s = sqlGenerator.addForUpdateClause(
                 "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
                         + " and nwi_table = " + quoteString(tblName));
         LOG.debug("Going to execute query <" + s + ">");
@@ -1475,19 +1482,33 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (!rs.next()) {
           // First allocation of write id should add the table to the next_write_id meta table
           // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
-          writeId = 1;
+          // For repl flow, we need to force set the incoming write id.
+          writeId = (srcWriteId > 0) ? srcWriteId : 1;
           s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                  + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")";
+                  + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")";
           LOG.debug("Going to execute insert <" + s + ">");
           stmt.execute(s);
         } else {
-          writeId = rs.getLong(1);
+          long nextWriteId = rs.getLong(1);
+          writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
+
           // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
           s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
                   + " where nwi_database = " + quoteString(dbName)
                   + " and nwi_table = " + quoteString(tblName);
           LOG.debug("Going to execute update <" + s + ">");
           stmt.executeUpdate(s);
+
+          // For repl flow, if the source write id is mismatching with target next write id, then current
+          // metadata in TXN_TO_WRITE_ID is stale for this table and hence need to clean-up TXN_TO_WRITE_ID.
+          // This is possible in case of first incremental repl after bootstrap where concurrent write
+          // and drop table was performed at source during bootstrap dump.
+          if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) {
+            s = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
+                    + " and t2w_table = " + quoteString(tblName);
+            LOG.debug("Going to execute delete <" + s + ">");
+            stmt.executeUpdate(s);
+          }
         }
 
         // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
@@ -1500,16 +1521,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           writeId++;
         }
 
-        if (rqst.isSetReplPolicy()) {
-          int lastIdx = txnToWriteIds.size()-1;
-          if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId()) ||
-              (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId())) {
-            LOG.error("Allocated write id range {} is not matching with the input write id range {}.",
-                    txnToWriteIds, srcTxnToWriteIds);
-            throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds);
-          }
-        }
-
         // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
         List<String> inserts = sqlGenerator.createInsertValuesStmt(
                 "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);