You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:01 UTC

[pulsar] 05/14: Avoid NPEs at ledger creation when DNS failures happen (#7403)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 30a52215f832309cbc1913090cae56ace87feaa2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jun 30 18:39:01 2020 -0700

    Avoid NPEs at ledger creation when DNS failures happen (#7403)
    
    * Avoid NPEs at ledger creation when DNS failures happen
    
    * Removed unnecessary try/catch
    
    (cherry picked from commit a230427f78e7cc844b2a22ababd05d217d9164bf)
---
 .gitignore                                         |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +--
 .../service/schema/BookkeeperSchemaStorage.java    | 33 +++++++++++++---------
 .../pulsar/compaction/TwoPhaseCompactor.java       | 31 ++++++++++++--------
 4 files changed, 41 insertions(+), 28 deletions(-)

diff --git a/.gitignore b/.gitignore
index f6c7c3a..297f31d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,4 @@ docker.debug-info
 examples/flink/src/main/java/org/apache/flink/avro/generated
 pulsar-flink/src/test/java/org/apache/flink/avro/generated
 pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
+/build/
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 66e977f..3e0d583 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2283,6 +2283,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     void createNewMetadataLedger(final VoidCallback callback) {
         ledger.mbean.startCursorLedgerCreateOp();
+
         ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
@@ -2349,7 +2350,6 @@ public class ManagedCursorImpl implements ManagedCursor {
                 });
             }));
         }, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
-
     }
 
     private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
@@ -2818,7 +2818,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             return null;
         }
     }
-  
+
     void updateReadStats(int readEntriesCount, long readEntriesSize) {
         this.entriesReadCount += readEntriesCount;
         this.entriesReadSize += readEntriesSize;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 05a9911..8b31358 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
@@ -500,20 +501,24 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
     private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
         Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId);
         final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
-        bookKeeper.asyncCreateLedger(
-            config.getManagedLedgerDefaultEnsembleSize(),
-            config.getManagedLedgerDefaultWriteQuorum(),
-            config.getManagedLedgerDefaultAckQuorum(),
-            BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
-            LedgerPassword,
-            (rc, handle, ctx) -> {
-                if (rc != BKException.Code.OK) {
-                    future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
-                } else {
-                    future.complete(handle);
-                }
-            }, null, metadata
-        );
+        try {
+            bookKeeper.asyncCreateLedger(
+                    config.getManagedLedgerDefaultEnsembleSize(),
+                    config.getManagedLedgerDefaultWriteQuorum(),
+                    config.getManagedLedgerDefaultAckQuorum(),
+                    BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                    LedgerPassword,
+                    (rc, handle, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
+                        } else {
+                            future.complete(handle);
+                        }
+                    }, null, metadata);
+        } catch (Throwable t) {
+            log.error("[{}] Encountered unexpected error when creating schema ledger", schemaId, t);
+            return FutureUtil.failedFuture(t);
+        }
         return future;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index df7c79b..4b7c8bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.RawBatchConverter;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -310,18 +311,24 @@ public class TwoPhaseCompactor extends Compactor {
 
     private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String,byte[]> metadata) {
         CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
-        bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
-                             conf.getManagedLedgerDefaultWriteQuorum(),
-                             conf.getManagedLedgerDefaultAckQuorum(),
-                             Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
-                             Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
-                             (rc, ledger, ctx) -> {
-                                 if (rc != BKException.Code.OK) {
-                                     bkf.completeExceptionally(BKException.create(rc));
-                                 } else {
-                                     bkf.complete(ledger);
-                                 }
-                             }, null, metadata);
+
+        try {
+            bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
+                    conf.getManagedLedgerDefaultWriteQuorum(),
+                    conf.getManagedLedgerDefaultAckQuorum(),
+                    Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+                    Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+                    (rc, ledger, ctx) -> {
+                        if (rc != BKException.Code.OK) {
+                            bkf.completeExceptionally(BKException.create(rc));
+                        } else {
+                            bkf.complete(ledger);
+                        }
+                    }, null, metadata);
+        } catch (Throwable t) {
+            log.error("Encountered unexpected error when creating compaction ledger", t);
+            return FutureUtil.failedFuture(t);
+        }
         return bkf;
     }