You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/15 02:07:14 UTC

[pulsar] branch branch-2.10 updated: [Issue 15896] Fix LockTimeout when storePut on the same key concurrently in RocksdbMetadataStore (#16005)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 8a14fe0f563 [Issue 15896] Fix LockTimeout when storePut on the same key concurrently in RocksdbMetadataStore (#16005)
8a14fe0f563 is described below

commit 8a14fe0f56372ba0df9c262f715af41125bfd12f
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Jun 15 10:06:34 2022 +0800

    [Issue 15896] Fix LockTimeout when storePut on the same key concurrently in RocksdbMetadataStore (#16005)
    
    * Fix LockTimeout when storePut or storeDelete on the same key concurrently
    
    * Add cleanup
---
 .../pulsar/metadata/impl/RocksdbMetadataStore.java | 10 +++++--
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 32 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 3ca87f3f915..944207eaecc 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -487,7 +487,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
             }
             try (Transaction transaction = db.beginTransaction(optionSync)) {
                 byte[] pathBytes = toBytes(path);
-                byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, false);
+                byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
                 MetaValue metaValue = MetaValue.parse(oldValueData);
                 if (metaValue == null) {
                     throw new MetadataStoreException.NotFoundException(String.format("path %s not found.", path));
@@ -504,6 +504,9 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
                 return CompletableFuture.completedFuture(null);
             }
         } catch (Throwable e) {
+            if (log.isDebugEnabled()) {
+                log.debug("error in storeDelete,path={}", path, e);
+            }
             return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
         } finally {
             dbStateLock.readLock().unlock();
@@ -523,7 +526,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
             }
             try (Transaction transaction = db.beginTransaction(optionSync)) {
                 byte[] pathBytes = toBytes(path);
-                byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, false);
+                byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
                 MetaValue metaValue = MetaValue.parse(oldValueData);
                 if (expectedVersion.isPresent()) {
                     if (metaValue == null && expectedVersion.get() != -1
@@ -572,6 +575,9 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
                                 metaValue.ephemeral, true));
             }
         } catch (Throwable e) {
+            if (log.isDebugEnabled()) {
+                log.debug("error in storePut,path={}", path, e);
+            }
             return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
         } finally {
             dbStateLock.readLock().unlock();
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 856c789b54e..f4e5bf779ba 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
 import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -447,4 +448,35 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
         assertEquals(successWrites.get(), maxValue);
         assertEquals(store.get(path).get().get().getValue()[0], maxValue);
     }
+
+    @Test(dataProvider = "impl")
+    public void testConcurrentPut(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String k = newKey();
+        CompletableFuture<Void> f1 =
+                CompletableFuture.runAsync(() -> store.put(k, new byte[0], Optional.of(-1L)).join());
+        CompletableFuture<Void> f2 =
+                CompletableFuture.runAsync(() -> store.put(k, new byte[0], Optional.of(-1L)).join());
+        Awaitility.await().until(() -> f1.isDone() && f2.isDone());
+        assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() ||
+                ! f1.isCompletedExceptionally() && f2.isCompletedExceptionally());
+    }
+
+    @Test(dataProvider = "impl")
+    public void testConcurrentDelete(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String k = newKey();
+        store.put(k, new byte[0], Optional.of(-1L)).join();
+        CompletableFuture<Void> f1 =
+                CompletableFuture.runAsync(() -> store.delete(k, Optional.empty()).join());
+        CompletableFuture<Void> f2 =
+                CompletableFuture.runAsync(() -> store.delete(k, Optional.empty()).join());
+        Awaitility.await().until(() -> f1.isDone() && f2.isDone());
+        assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() ||
+                ! f1.isCompletedExceptionally() && f2.isCompletedExceptionally());
+    }
 }