You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/18 05:24:32 UTC

[GitHub] [pulsar] sijie commented on a change in pull request #9212: [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger

sijie commented on a change in pull request #9212:
URL: https://github.com/apache/pulsar/pull/9212#discussion_r559322040



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
##########
@@ -41,6 +41,8 @@
 
     CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId);
 
+    CompletableFuture<SchemaVersion> deleteSchemaStorage(String schemaId, boolean forcefully);

Review comment:
       mark this a default method?

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java
##########
@@ -32,6 +32,8 @@
 
     CompletableFuture<List<CompletableFuture<StoredSchema>>> getAll(String key);
 
+    CompletableFuture<SchemaVersion> delete(String key, boolean forcefully);

Review comment:
       make this a default method?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
##########
@@ -345,20 +352,58 @@ private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData newSch
     }
 
     public CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId) {
-        return getAllSchemas(schemaId).thenCompose(FutureUtils::collect).thenApply(list -> {
-            // Trim the prefix of schemas before the latest delete.
-            int lastIndex = list.size() - 1;
-            for (int i = lastIndex; i >= 0; i--) {
-                if (list.get(i).schema.isDeleted()) {
-                    if (i == lastIndex) { // if the latest schema is a delete, there's no schemas to compare
-                        return Collections.emptyList();
-                    } else {
-                        return list.subList(i + 1, list.size());
-                    }
+
+        CompletableFuture<List<SchemaAndMetadata>> schemaResult = new CompletableFuture<>();
+        CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> schemaFutureList = getAllSchemas(schemaId);
+        schemaFutureList.thenCompose(FutureUtils::collect).handle((schemaList, ex) -> {
+            List<SchemaAndMetadata> list = ex != null ? new ArrayList<>() : schemaList;
+            if (ex != null) {
+                boolean recoverable = ex.getCause() != null && (ex.getCause() instanceof SchemaException)
+                        ? ((SchemaException) ex.getCause()).isRecoverable()
+                        : true;
+                // if error is recoverable then fail the request.
+                if (recoverable) {
+                    schemaResult.completeExceptionally(ex.getCause());
+                    return null;
                 }
+                // clean the schema list for recoverable and delete the schema from zk
+                schemaFutureList.getNow(Collections.emptyList()).forEach(schemaFuture -> {
+                    if (!schemaFuture.isCompletedExceptionally()) {
+                        list.add(schemaFuture.getNow(null));
+                        return;
+                    }
+                });
+                trimDeletedSchemaAndGetList(list);
+                // clean up the broken schema from zk
+                deleteSchemaStorage(schemaId, true).handle((sv, th) -> {
+                    log.info("Deletion of {} {}", schemaId,

Review comment:
       What a user needs to do when he/she reads this message? Especially if it is "Deletion of ... failed", it might be worth adding more details for the "failed" case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org