You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:01 UTC
[pulsar] 05/14: Avoid NPEs at ledger creation when DNS failures
happen (#7403)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 30a52215f832309cbc1913090cae56ace87feaa2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jun 30 18:39:01 2020 -0700
Avoid NPEs at ledger creation when DNS failures happen (#7403)
* Avoid NPEs at ledger creation when DNS failures happen
* Removed unnecessary try/catch
(cherry picked from commit a230427f78e7cc844b2a22ababd05d217d9164bf)
---
.gitignore | 1 +
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +--
.../service/schema/BookkeeperSchemaStorage.java | 33 +++++++++++++---------
.../pulsar/compaction/TwoPhaseCompactor.java | 31 ++++++++++++--------
4 files changed, 41 insertions(+), 28 deletions(-)
diff --git a/.gitignore b/.gitignore
index f6c7c3a..297f31d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,4 @@ docker.debug-info
examples/flink/src/main/java/org/apache/flink/avro/generated
pulsar-flink/src/test/java/org/apache/flink/avro/generated
pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
+/build/
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 66e977f..3e0d583 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2283,6 +2283,7 @@ public class ManagedCursorImpl implements ManagedCursor {
void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();
+
ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {
if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
@@ -2349,7 +2350,6 @@ public class ManagedCursorImpl implements ManagedCursor {
});
}));
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));
-
}
private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
@@ -2818,7 +2818,7 @@ public class ManagedCursorImpl implements ManagedCursor {
return null;
}
}
-
+
void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 05a9911..8b31358 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
@@ -500,20 +501,24 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
private CompletableFuture<LedgerHandle> createLedger(String schemaId) {
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForSchema(schemaId);
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
- bookKeeper.asyncCreateLedger(
- config.getManagedLedgerDefaultEnsembleSize(),
- config.getManagedLedgerDefaultWriteQuorum(),
- config.getManagedLedgerDefaultAckQuorum(),
- BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
- LedgerPassword,
- (rc, handle, ctx) -> {
- if (rc != BKException.Code.OK) {
- future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
- } else {
- future.complete(handle);
- }
- }, null, metadata
- );
+ try {
+ bookKeeper.asyncCreateLedger(
+ config.getManagedLedgerDefaultEnsembleSize(),
+ config.getManagedLedgerDefaultWriteQuorum(),
+ config.getManagedLedgerDefaultAckQuorum(),
+ BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+ LedgerPassword,
+ (rc, handle, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ future.completeExceptionally(bkException("Failed to create ledger", rc, -1, -1));
+ } else {
+ future.complete(handle);
+ }
+ }, null, metadata);
+ } catch (Throwable t) {
+ log.error("[{}] Encountered unexpected error when creating schema ledger", schemaId, t);
+ return FutureUtil.failedFuture(t);
+ }
return future;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index df7c79b..4b7c8bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -310,18 +311,24 @@ public class TwoPhaseCompactor extends Compactor {
private CompletableFuture<LedgerHandle> createLedger(BookKeeper bk, Map<String,byte[]> metadata) {
CompletableFuture<LedgerHandle> bkf = new CompletableFuture<>();
- bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
- conf.getManagedLedgerDefaultWriteQuorum(),
- conf.getManagedLedgerDefaultAckQuorum(),
- Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
- Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
- (rc, ledger, ctx) -> {
- if (rc != BKException.Code.OK) {
- bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(ledger);
- }
- }, null, metadata);
+
+ try {
+ bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(),
+ conf.getManagedLedgerDefaultWriteQuorum(),
+ conf.getManagedLedgerDefaultAckQuorum(),
+ Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
+ Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
+ (rc, ledger, ctx) -> {
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(ledger);
+ }
+ }, null, metadata);
+ } catch (Throwable t) {
+ log.error("Encountered unexpected error when creating compaction ledger", t);
+ return FutureUtil.failedFuture(t);
+ }
return bkf;
}