You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/04/11 04:46:15 UTC
[hudi] branch master updated: [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2245a9515f [HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)
2245a9515f is described below
commit 2245a9515f1d048b3f0a832c5087c227fbab132e
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sun Apr 10 21:46:07 2022 -0700
[HUDI-3798] Fixing ending of a transaction by different owner and removing some extraneous methods in trxn manager (#5255)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 16 +++--
.../apache/hudi/client/HoodieTimelineArchiver.java | 5 +-
.../client/transaction/TransactionManager.java | 27 ++------
.../table/action/index/RunIndexActionExecutor.java | 4 +-
.../client/transaction/TestTransactionManager.java | 74 ++++++++++++++++------
5 files changed, 75 insertions(+), 51 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 32a8dee517..c1ebef7bb6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -795,7 +795,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
- HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.of(restoreInstantTime), initialMetadataTableIfNecessary);
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
@@ -1035,7 +1035,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
HoodieTable table = createTable(config, hadoopConf);
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
- this.txnManager.beginTransaction();
+ HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
+ this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
table.getMetadataWriter(dropInstant).ifPresent(w -> {
@@ -1046,7 +1047,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
});
} finally {
- this.txnManager.endTransaction();
+ this.txnManager.endTransaction(Option.of(ownerInstant));
}
}
@@ -1451,13 +1452,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
HoodieTable table;
-
- this.txnManager.beginTransaction();
+ Option<HoodieInstant> ownerInstant = Option.empty();
+ if (instantTime.isPresent()) {
+ ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
+ }
+ this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
- this.txnManager.endTransaction();
+ this.txnManager.endTransaction(ownerInstant);
}
// Validate table properties
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index ca76e4e3bf..12d00bf618 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -157,7 +157,8 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
try {
if (acquireLock) {
- txnManager.beginTransaction();
+ // there is no owner or instant time per se for archival.
+ txnManager.beginTransaction(Option.empty(), Option.empty());
}
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
verifyLastMergeArchiveFilesIfNecessary(context);
@@ -179,7 +180,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
} finally {
close();
if (acquireLock) {
- txnManager.endTransaction();
+ txnManager.endTransaction(Option.empty());
}
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
index d9b9d3d269..aef1fee5e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java
@@ -45,14 +45,6 @@ public class TransactionManager implements Serializable {
this.isOptimisticConcurrencyControlEnabled = config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl();
}
- public void beginTransaction() {
- if (isOptimisticConcurrencyControlEnabled) {
- LOG.info("Transaction starting without a transaction owner");
- lockManager.lock();
- LOG.info("Transaction started without a transaction owner");
- }
- }
-
public void beginTransaction(Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
@@ -65,30 +57,25 @@ public class TransactionManager implements Serializable {
}
}
- public void endTransaction() {
- if (isOptimisticConcurrencyControlEnabled) {
- LOG.info("Transaction ending without a transaction owner");
- lockManager.unlock();
- LOG.info("Transaction ended without a transaction owner");
- }
- }
-
public void endTransaction(Option<HoodieInstant> currentTxnOwnerInstant) {
if (isOptimisticConcurrencyControlEnabled) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
- reset(currentTxnOwnerInstant, Option.empty(), Option.empty());
- lockManager.unlock();
- LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
+ if (reset(currentTxnOwnerInstant, Option.empty(), Option.empty())) {
+ lockManager.unlock();
+ LOG.info("Transaction ended with transaction owner " + currentTxnOwnerInstant);
+ }
}
}
- private synchronized void reset(Option<HoodieInstant> callerInstant,
+ private synchronized boolean reset(Option<HoodieInstant> callerInstant,
Option<HoodieInstant> newTxnOwnerInstant,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) {
if (!this.currentTxnOwnerInstant.isPresent() || this.currentTxnOwnerInstant.get().equals(callerInstant.get())) {
this.currentTxnOwnerInstant = newTxnOwnerInstant;
this.lastCompletedTxnOwnerInstant = lastCompletedTxnOwnerInstant;
+ return true;
}
+ return false;
}
public void close() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 8c86a298f8..339e95b9e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -232,14 +232,14 @@ public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> exte
HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
try {
// update the table config and timeline in a lock as there could be another indexer running
- txnManager.beginTransaction();
+ txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
updateMetadataPartitionsTableConfig(table.getMetaClient(),
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()),
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
} finally {
- txnManager.endTransaction();
+ txnManager.endTransaction(Option.of(indexInstant));
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index 22f8017841..6573560e75 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -69,20 +69,28 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
@Test
public void testSingleWriterTransaction() {
- transactionManager.beginTransaction();
- transactionManager.endTransaction();
+ Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
+ Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
+ transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
+ transactionManager.endTransaction(newTxnOwnerInstant);
}
@Test
public void testSingleWriterNestedTransaction() {
- transactionManager.beginTransaction();
+ Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
+ Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
+ transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
+
+ Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000003");
+ Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000004");
+
assertThrows(HoodieLockException.class, () -> {
- transactionManager.beginTransaction();
+ transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
});
- transactionManager.endTransaction();
+ transactionManager.endTransaction(newTxnOwnerInstant);
assertDoesNotThrow(() -> {
- transactionManager.endTransaction();
+ transactionManager.endTransaction(newTxnOwnerInstant1);
});
}
@@ -94,11 +102,16 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
+ Option<HoodieInstant> lastCompletedInstant1 = getInstant("0000001");
+ Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000002");
+ Option<HoodieInstant> lastCompletedInstant2 = getInstant("0000003");
+ Option<HoodieInstant> newTxnOwnerInstant2 = getInstant("0000004");
+
// Let writer1 get the lock first, then wait for others
// to join the sync up point.
Thread writer1 = new Thread(() -> {
assertDoesNotThrow(() -> {
- transactionManager.beginTransaction();
+ transactionManager.beginTransaction(newTxnOwnerInstant1, lastCompletedInstant1);
});
latch.countDown();
try {
@@ -111,7 +124,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
//
}
assertDoesNotThrow(() -> {
- transactionManager.endTransaction();
+ transactionManager.endTransaction(newTxnOwnerInstant1);
});
writer1Completed.set(true);
});
@@ -127,10 +140,10 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
//
}
assertDoesNotThrow(() -> {
- transactionManager.beginTransaction();
+ transactionManager.beginTransaction(newTxnOwnerInstant2, lastCompletedInstant2);
});
assertDoesNotThrow(() -> {
- transactionManager.endTransaction();
+ transactionManager.endTransaction(newTxnOwnerInstant2);
});
writer2Completed.set(true);
});
@@ -152,6 +165,32 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
Assertions.assertTrue(writer2Completed.get());
}
+ @Test
+ public void testEndTransactionByDiffOwner() throws InterruptedException {
+ // 1. Begin and end by the same transaction owner
+ Option<HoodieInstant> lastCompletedInstant = getInstant("0000001");
+ Option<HoodieInstant> newTxnOwnerInstant = getInstant("0000002");
+ transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ // Another writer thread
+ Thread writer2 = new Thread(() -> {
+ Option<HoodieInstant> newTxnOwnerInstant1 = getInstant("0000003");
+ transactionManager.endTransaction(newTxnOwnerInstant1);
+ countDownLatch.countDown();
+ });
+
+ writer2.start();
+ countDownLatch.await(30, TimeUnit.SECONDS);
+ // should not have reset the state within transaction manager since the owner is different.
+ Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
+ Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner().isPresent());
+
+ transactionManager.endTransaction(newTxnOwnerInstant);
+ Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
+ Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
+ }
+
@Test
public void testTransactionsWithInstantTime() {
// 1. Begin and end by the same transaction owner
@@ -164,14 +203,15 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
- // 2. Begin transaction with a new txn owner, but end transaction with no/wrong owner
+ // 2. Begin transaction with a new txn owner, but end transaction with wrong owner
lastCompletedInstant = getInstant("0000002");
newTxnOwnerInstant = getInstant("0000003");
transactionManager.beginTransaction(newTxnOwnerInstant, lastCompletedInstant);
- transactionManager.endTransaction();
+ transactionManager.endTransaction(getInstant("0000004"));
// Owner reset would not happen as the end txn was invoked with an incorrect current txn owner
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner() == newTxnOwnerInstant);
Assertions.assertTrue(transactionManager.getLastCompletedTransactionOwner() == lastCompletedInstant);
+ transactionManager.endTransaction(newTxnOwnerInstant);
// 3. But, we should be able to begin a new transaction for a new owner
lastCompletedInstant = getInstant("0000003");
@@ -183,15 +223,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
- // 4. Transactions with no owners should also go through
- transactionManager.beginTransaction();
- Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
- Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
- transactionManager.endTransaction();
- Assertions.assertFalse(transactionManager.getCurrentTransactionOwner().isPresent());
- Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());
-
- // 5. Transactions with new instants but with same timestamps should properly reset owners
+ // 4. Transactions with new instants but with same timestamps should properly reset owners
transactionManager.beginTransaction(getInstant("0000005"), Option.empty());
Assertions.assertTrue(transactionManager.getCurrentTransactionOwner().isPresent());
Assertions.assertFalse(transactionManager.getLastCompletedTransactionOwner().isPresent());