You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/04/08 17:34:29 UTC

[pulsar] branch master updated: Handle KeeperException.BadVersionException thrown by updateSchemaLocator() (#6683)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cf045e4  Handle KeeperException.BadVersionException thrown by updateSchemaLocator() (#6683)
cf045e4 is described below

commit cf045e4152bbe54eabdc1bb29888f071c3fe563d
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Thu Apr 9 02:34:19 2020 +0900

    Handle KeeperException.BadVersionException thrown by updateSchemaLocator() (#6683)
    
    Co-authored-by: Sijie Guo <si...@apache.org>
---
 .../service/schema/BookkeeperSchemaStorage.java    | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 4304d21..df41e71 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -264,7 +264,29 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
                 //don't check the schema whether already exist
                 return readSchemaEntry(locator.getIndexList().get(0).getPosition())
                         .thenCompose(schemaEntry -> addNewSchemaEntryToStore(schemaId, locator.getIndexList(), data).thenCompose(
-                        position -> updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)));
+                        position -> {
+                            CompletableFuture<Long> future = new CompletableFuture<>();
+                            updateSchemaLocator(schemaId, optLocatorEntry.get(), position, hash)
+                                    .thenAccept(future::complete)
+                                    .exceptionally(ex -> {
+                                        if (ex.getCause() instanceof KeeperException.BadVersionException) {
+                                            // There was a race condition on the schema creation. Since it has now been created,
+                                            // retry the whole operation so that we have a chance to recover without bubbling error
+                                            putSchema(schemaId, data, hash)
+                                                    .thenAccept(future::complete)
+                                                    .exceptionally(ex2 -> {
+                                                        future.completeExceptionally(ex2);
+                                                        return null;
+                                                    });
+                                        } else {
+                                            // For other errors, just fail the operation
+                                            future.completeExceptionally(ex);
+                                        }
+                                        return null;
+                                    });
+                            return future;
+                        })
+                );
             } else {
                 // No schema was defined yet
                 CompletableFuture<Long> future = new CompletableFuture<>();