You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/04/08 17:34:29 UTC
[pulsar] branch master updated: Handle
KeeperException.BadVersionException thrown by updateSchemaLocator() (#6683)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cf045e4 Handle KeeperException.BadVersionException thrown by updateSchemaLocator() (#6683)
cf045e4 is described below
commit cf045e4152bbe54eabdc1bb29888f071c3fe563d
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Thu Apr 9 02:34:19 2020 +0900
Handle KeeperException.BadVersionException thrown by updateSchemaLocator() (#6683)
Co-authored-by: Sijie Guo <si...@apache.org>
---
.../service/schema/BookkeeperSchemaStorage.java | 24 +++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 4304d21..df41e71 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -264,7 +264,29 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
//don't check the schema whether already exist
return readSchemaEntry(locator.getIndexList().get(0).getPosition())
.thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data).thenCompose(
- position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)));
+ position -> {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
+ .thenAccept(future::complete)
+ .exceptionally(ex -> {
+ if (ex.getCause() instanceof KeeperException.BadVersionException) {
+ // There was a race condition on the schema creation. Since it has now been created,
+ // retry the whole operation so that we have a chance to recover without bubbling error
+ putSchema(schemaId, data, hash)
+ .thenAccept(future::complete)
+ .exceptionally(ex2 -> {
+ future.completeExceptionally(ex2);
+ return null;
+ });
+ } else {
+ // For other errors, just fail the operation
+ future.completeExceptionally(ex);
+ }
+ return null;
+ });
+ return future;
+ })
+ );
} else {
// No schema was defined yet
CompletableFuture<Long> future = new CompletableFuture<>();