You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/15 02:07:14 UTC
[pulsar] branch branch-2.10 updated: [Issue 15896] Fix LockTimeout when storePut on the same key concurrently in RocksdbMetadataStore (#16005)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 8a14fe0f563 [Issue 15896] Fix LockTimeout when storePut on the same key concurrently in RocksdbMetadataStore (#16005)
8a14fe0f563 is described below
commit 8a14fe0f56372ba0df9c262f715af41125bfd12f
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Jun 15 10:06:34 2022 +0800
[Issue 15896] Fix LockTimeout when storePut on the same key concurrently in RocksdbMetadataStore (#16005)
* Fix LockTimeout when storePut or storeDelete on the same key concurrently
* Add cleanup
---
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 10 +++++--
.../apache/pulsar/metadata/MetadataStoreTest.java | 32 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 3ca87f3f915..944207eaecc 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -487,7 +487,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
}
try (Transaction transaction = db.beginTransaction(optionSync)) {
byte[] pathBytes = toBytes(path);
- byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, false);
+ byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
if (metaValue == null) {
throw new MetadataStoreException.NotFoundException(String.format("path %s not found.", path));
@@ -504,6 +504,9 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
return CompletableFuture.completedFuture(null);
}
} catch (Throwable e) {
+ if (log.isDebugEnabled()) {
+ log.debug("error in storeDelete,path={}", path, e);
+ }
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
@@ -523,7 +526,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
}
try (Transaction transaction = db.beginTransaction(optionSync)) {
byte[] pathBytes = toBytes(path);
- byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, false);
+ byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
if (expectedVersion.isPresent()) {
if (metaValue == null && expectedVersion.get() != -1
@@ -572,6 +575,9 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
metaValue.ephemeral, true));
}
} catch (Throwable e) {
+ if (log.isDebugEnabled()) {
+ log.debug("error in storePut,path={}", path, e);
+ }
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 856c789b54e..f4e5bf779ba 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
@Slf4j
@@ -447,4 +448,35 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
assertEquals(successWrites.get(), maxValue);
assertEquals(store.get(path).get().get().getValue()[0], maxValue);
}
+
+ @Test(dataProvider = "impl")
+ public void testConcurrentPut(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String k = newKey();
+ CompletableFuture<Void> f1 =
+ CompletableFuture.runAsync(() -> store.put(k, new byte[0], Optional.of(-1L)).join());
+ CompletableFuture<Void> f2 =
+ CompletableFuture.runAsync(() -> store.put(k, new byte[0], Optional.of(-1L)).join());
+ Awaitility.await().until(() -> f1.isDone() && f2.isDone());
+ assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() ||
+ ! f1.isCompletedExceptionally() && f2.isCompletedExceptionally());
+ }
+
+ @Test(dataProvider = "impl")
+ public void testConcurrentDelete(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String k = newKey();
+ store.put(k, new byte[0], Optional.of(-1L)).join();
+ CompletableFuture<Void> f1 =
+ CompletableFuture.runAsync(() -> store.delete(k, Optional.empty()).join());
+ CompletableFuture<Void> f2 =
+ CompletableFuture.runAsync(() -> store.delete(k, Optional.empty()).join());
+ Awaitility.await().until(() -> f1.isDone() && f2.isDone());
+ assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() ||
+ ! f1.isCompletedExceptionally() && f2.isCompletedExceptionally());
+ }
}