You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/13 00:02:53 UTC
[pulsar] branch master updated: [metadata] fix MetadataStore#put may get unexcepted exception (#14903)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3b852bcb547 [metadata] fix MetadataStore#put may get unexcepted exception (#14903)
3b852bcb547 is described below
commit 3b852bcb54780ae26cd339e5e30bbe83de3a30cd
Author: WangJialing <65...@users.noreply.github.com>
AuthorDate: Wed Jul 13 08:02:46 2022 +0800
[metadata] fix MetadataStore#put may get unexcepted exception (#14903)
* fix MetadataStore#put may get unexcepted exception
* fix checksytle
* add test
* update test code
---
.../pulsar/metadata/impl/ZKMetadataStore.java | 8 +++++++-
.../apache/pulsar/metadata/MetadataStoreTest.java | 21 +++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 6697934d56b..ad723f28f89 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -386,7 +386,13 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept(
s -> future.complete(s))
.exceptionally(ex -> {
- future.completeExceptionally(MetadataStoreException.wrap(ex.getCause()));
+ if (ex.getCause() instanceof BadVersionException) {
+ // The z-node exist now, let's overwrite it
+ internalStorePut(opPut);
+ } else {
+ future.completeExceptionally(
+ MetadataStoreException.wrap(ex.getCause()));
+ }
return null;
});
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index f4e5bf779ba..b8a091fef25 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -38,6 +38,7 @@ import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -84,6 +85,26 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
}
}
+ @Test(dataProvider = "impl")
+ public void concurrentPutTest(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String data = "data";
+ String path = "/non-existing-key";
+ int concurrent = 50;
+ List<CompletableFuture<Stat>> futureList = new ArrayList<>();
+ for (int i = 0; i < concurrent; i++) {
+ futureList.add(store.put(path, data.getBytes(), Optional.empty()).exceptionally(ex -> {
+ fail("fail to execute concurrent put", ex);
+ return null;
+ }));
+ }
+ FutureUtil.waitForAll(futureList).join();
+
+ assertEquals(store.get(path).join().get().getValue(), data.getBytes());
+ }
+
@Test(dataProvider = "impl")
public void insertionTestWithExpectedVersion(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup