You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/04/11 04:46:15 UTC

[hudi] branch master updated: [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2245a9515f [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)
2245a9515f is described below

commit 2245a9515f1d048b3f0a832c5087c227fbab132e
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sun Apr 10 21:46:07 2022 -0700

    [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 16 +++--
 .../apache/hudi/client/HoodieTimelineArchiver.java |  5 +-
 .../client/transaction/TransactionManager.java     | 27 ++------
 .../table/action/index/RunIndexActionExecutor.java |  4 +-
 .../client/transaction/TestTransactionManager.java | 74 ++++++++++++++++------
 5 files changed, 75 insertions(+), 51 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 32a8dee517..c1ebef7bb6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -795,7 +795,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
     Timer.Context timerContext = metrics.getRollbackCtx();
     try {
-      HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
+      HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
       Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
       if (restorePlanOption.isPresent()) {
         HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
@@ -1035,7 +1035,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
   public void dropIndex(List<MetadataPartitionType> partitionTypes) {
     HoodieTable table = createTable(config, hadoopConf);
     String dropInstant = HoodieActiveTimeline.createNewInstantTime();
-    this.txnManager.beginTransaction();
+    HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
+    this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
     try {
       context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
       table.getMetadataWriter(dropInstant).ifPresent(w -> {
@@ -1046,7 +1047,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
         }
       });
     } finally {
-      this.txnManager.endTransaction();
+      this.txnManager.endTransaction(Option.of(ownerInstant));
     }
   }
 
@@ -1451,13 +1452,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
     }
 
     HoodieTable table;
-
-    this.txnManager.beginTransaction();
+    Option<HoodieInstant> ownerInstant = Option.empty();
+    if (instantTime.isPresent()) {
+      ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
+    }
+    this.txnManager.beginTransaction(ownerInstant, Option.empty());
     try {
       tryUpgrade(metaClient, instantTime);
       table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
     } finally {
-      this.txnManager.endTransaction();
+      this.txnManager.endTransaction(ownerInstant);
     }
 
     // Validate table properties
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index ca76e4e3bf..12d00bf618 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -157,7 +157,8 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
   public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
     try {
       if (acquireLock) {
-        txnManager.beginTransaction();
+        // there is no owner or instant time per se for archival.
+        txnManager.beginTransaction(Option.empty(), Option.empty());
       }
       List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
       verifyLastMergeArchiveFilesIfNecessary(context);
@@ -179,7 +180,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
     } finally {
       close();
       if (acquireLock) {
-        txnManager.endTransaction();
+        txnManager.endTransaction(Option.empty());
       }
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index d9b9d3d269..aef1fee5e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -45,14 +45,6 @@ public class TransactionManager implements Serializable {
     this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
   }
 
-  public void beginTransaction() {
-    if (isOptimisticConcurrencyControlEnabled) {
-      LOG.info("Transaction starting without a transaction owner");
-      lockManager.lock();
-      LOG.info("Transaction started without a transaction owner");
-    }
-  }
-
   public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
                                Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
     if (isOptimisticConcurrencyControlEnabled) {
@@ -65,30 +57,25 @@ public class TransactionManager implements Serializable {
     }
   }
 
-  public void endTransaction() {
-    if (isOptimisticConcurrencyControlEnabled) {
-      LOG.info("Transaction ending without a transaction owner");
-      lockManager.unlock();
-      LOG.info("Transaction ended without a transaction owner");
-    }
-  }
-
   public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
     if (isOptimisticConcurrencyControlEnabled) {
       LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
-      reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
-      lockManager.unlock();
-      LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
+      if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
+        lockManager.unlock();
+        LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
+      }
     }
   }
 
-  private synchronized void reset(Option<HoodieInstant> callerInstant,
+  private synchronized boolean reset(Option<HoodieInstant> callerInstant,
                                   Option<HoodieInstant> newTxnOwnerInstant,
                                   Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
     if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
       this.currentTxnOwnerInstant = newTxnOwnerInstant;
       this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
+      return true;
     }
+    return false;
   }
 
   public void close() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 8c86a298f8..339e95b9e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -232,14 +232,14 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte
                                             HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
     try {
       // update the table config and timeline in a lock as there could be another indexer running
-      txnManager.beginTransaction();
+      txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
       updateMetadataPartitionsTableConfig(table.getMetaClient(),
           finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
       table.getActiveTimeline().saveAsComplete(
           new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()),
           TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
     } finally {
-      txnManager.endTransaction();
+      txnManager.endTransaction(Option.of(indexInstant));
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index 22f8017841..6573560e75 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -69,20 +69,28 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
 
   @Test
   public void testSingleWriterTransaction() {
-    transactionManager.beginTransaction();
-    transactionManager.endTransaction();
+    Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
+    Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
+    transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
+    transactionManager.endTransaction(newTxnOwnerInstant);
   }
 
   @Test
   public void testSingleWriterNestedTransaction() {
-    transactionManager.beginTransaction();
+    Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
+    Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
+    transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
+
+    Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003");
+    Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004");
+
     assertThrows(HoodieLockException.class, () -> {
-      transactionManager.beginTransaction();
+      transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
     });
 
-    transactionManager.endTransaction();
+    transactionManager.endTransaction(newTxnOwnerInstant);
     assertDoesNotThrow(() -> {
-      transactionManager.endTransaction();
+      transactionManager.endTransaction(newTxnOwnerInstant1);
     });
   }
 
@@ -94,11 +102,16 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
     final AtomicBoolean writer1Completed = new AtomicBoolean(false);
     final AtomicBoolean writer2Completed = new AtomicBoolean(false);
 
+    Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000001");
+    Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000002");
+    Option<HoodieInstant> lastCompletedInstant2 = getInstant("0000003");
+    Option<HoodieInstant> newTxnOwnerInstant2 = getInstant("0000004");
+
     // Let writer1 get the lock first, then wait for others
     // to join the sync up point.
     Thread writer1 = new Thread(() -> {
       assertDoesNotThrow(() -> {
-        transactionManager.beginTransaction();
+        transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
       });
       latch.countDown();
       try {
@@ -111,7 +124,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
         //
       }
       assertDoesNotThrow(() -> {
-        transactionManager.endTransaction();
+        transactionManager.endTransaction(newTxnOwnerInstant1);
       });
       writer1Completed.set(true);
     });
@@ -127,10 +140,10 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
         //
       }
       assertDoesNotThrow(() -> {
-        transactionManager.beginTransaction();
+        transactionManager.beginTransaction(newTxnOwnerInstant2, lastCompletedInstant2);
       });
       assertDoesNotThrow(() -> {
-        transactionManager.endTransaction();
+        transactionManager.endTransaction(newTxnOwnerInstant2);
       });
       writer2Completed.set(true);
     });
@@ -152,6 +165,32 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
     Assertions.assertTrue(writer2Completed.get());
   }
 
+  @Test
+  public void testEndTransactionByDiffOwner() throws InterruptedException {
+    // 1. Begin and end by the same transaction owner
+    Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
+    Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
+    transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
+
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    // Another writer thread
+    Thread writer2 = new Thread(() -> {
+      Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003");
+      transactionManager.endTransaction(newTxnOwnerInstant1);
+      countDownLatch.countDown();
+    });
+
+    writer2.start();
+    countDownLatch.await(30, TimeUnit.SECONDS);
+    // should not have reset the state within transaction manager since the owner is different.
+    Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
+    Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent());
+
+    transactionManager.endTransaction(newTxnOwnerInstant);
+    Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
+    Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
+  }
+
   @Test
   public void testTransactionsWithInstantTime() {
     // 1. Begin and end by the same transaction owner
@@ -164,14 +203,15 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
     Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
-    // 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner
+    // 2. Begin transaction with a new txn owner, but end transaction with wrong owner
     lastCompletedInstant = getInstant("0000002");
     newTxnOwnerInstant = getInstant("0000003");
     transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
-    transactionManager.endTransaction();
+    transactionManager.endTransaction(getInstant("0000004"));
     // Owner reset would not happen as the end txn was invoked with an incorrect current txn owner
     Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
     Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
+    transactionManager.endTransaction(newTxnOwnerInstant);
 
     // 3. But, we should be able to begin a new transaction for a new owner
     lastCompletedInstant = getInstant("0000003");
@@ -183,15 +223,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
     Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
     Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
 
-    // 4. Transactions with no owners should also go through
-    transactionManager.beginTransaction();
-    Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
-    Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
-    transactionManager.endTransaction();
-    Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
-    Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
-
-    // 5. Transactions with new instants but with same timestamps should properly reset owners
+    // 4. Transactions with new instants but with same timestamps should properly reset owners
     transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
     Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
     Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());