You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2024/03/04 10:51:05 UTC

(pulsar) branch branch-3.1 updated (c9ecd5221f4 -> 62de4a5639b)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a change to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from c9ecd5221f4 [fix][test] Fix test testAsyncFunctionMaxPending (#22121)
     new 93a44574fca [fix][sec] Upgrade Jetty to 9.4.54.v20240208 to address CVE-2024-22201 (#22144)
     new c6b4887124b [fix][txn]Fix TopicTransactionBuffer potential thread safety issue (#22149)
     new bbbd1ef5314 [fix] [broker] print non log when delete partitioned topic failed (#22153)
     new 62de4a5639b [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   | 38 +++++++++++-----------
 distribution/shell/src/assemble/LICENSE.bin.txt    | 16 ++++-----
 pom.xml                                            |  2 +-
 .../broker/service/persistent/PersistentTopic.java |  6 ++--
 .../buffer/impl/TopicTransactionBuffer.java        | 30 +++++++++--------
 .../impl/FileStoreBackedReadHandleImpl.java        | 36 +++++++++++++++-----
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 32 +++++++++++-------
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 14 ++++++--
 8 files changed, 108 insertions(+), 66 deletions(-)


(pulsar) 03/04: [fix] [broker] print non log when delete partitioned topic failed (#22153)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bbbd1ef5314049b15957737808a29e61160aac0d
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Feb 29 12:47:54 2024 +0800

    [fix] [broker] print non log when delete partitioned topic failed (#22153)
    
    (cherry picked from commit 72cedb7020c75ada0d26b8120e55e0bec4467f13)
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 12f0f738bb8..2752f247853 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2688,7 +2688,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
             replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions,
                 deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, false))
-                    .thenApply((res) -> tryToDeletePartitionedMetadata())
+                    .thenCompose((res) -> tryToDeletePartitionedMetadata())
                     .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
                     .exceptionally(e -> {
                         if (e.getCause() instanceof TopicBusyException) {
@@ -2696,6 +2696,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                             if (log.isDebugEnabled()) {
                                 log.debug("[{}] Did not delete busy topic: {}", topic, e.getCause().getMessage());
                             }
+                        } else if (e.getCause() instanceof UnsupportedOperationException) {
+                            log.info("[{}] Skip to delete partitioned topic: {}", topic, e.getCause().getMessage());
                         } else {
                             log.warn("[{}] Inactive topic deletion failed", topic, e);
                         }
@@ -2740,7 +2742,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                                                         .filter(topicExist -> topicExist)
                                                         .findAny();
                                                 if (anyExistPartition.isPresent()) {
-                                                    log.error("[{}] Delete topic metadata failed because"
+                                                    log.info("[{}] Delete topic metadata failed because"
                                                             + " another partition exist.", topicName);
                                                     throw new UnsupportedOperationException(
                                                             String.format("Another partition exists for [%s].",


(pulsar) 01/04: [fix][sec] Upgrade Jetty to 9.4.54.v20240208 to address CVE-2024-22201 (#22144)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 93a44574fca5e562173b2b0e7e60f568f7e5a934
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Feb 28 11:40:21 2024 +0200

    [fix][sec] Upgrade Jetty to 9.4.54.v20240208 to address CVE-2024-22201 (#22144)
    
    (cherry picked from commit e3a081e4c5ea380eb505751193bc71dd0ae39281)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 38 ++++++++++++------------
 distribution/shell/src/assemble/LICENSE.bin.txt  | 16 +++++-----
 pom.xml                                          |  2 +-
 3 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index e7a7d27d760..7eb652e2f60 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -382,25 +382,25 @@ The Apache Software License, Version 2.0
     - org.asynchttpclient-async-http-client-2.12.1.jar
     - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
  * Jetty
-    - org.eclipse.jetty-jetty-client-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-continuation-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-http-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-io-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-proxy-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-security-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-server-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-servlet-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-servlets-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-util-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-util-ajax-9.4.53.v20231009.jar
-    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.53.v20231009.jar
-    - org.eclipse.jetty.websocket-websocket-api-9.4.53.v20231009.jar
-    - org.eclipse.jetty.websocket-websocket-client-9.4.53.v20231009.jar
-    - org.eclipse.jetty.websocket-websocket-common-9.4.53.v20231009.jar
-    - org.eclipse.jetty.websocket-websocket-server-9.4.53.v20231009.jar
-    - org.eclipse.jetty.websocket-websocket-servlet-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-alpn-conscrypt-server-9.4.53.v20231009.jar
-    - org.eclipse.jetty-jetty-alpn-server-9.4.53.v20231009.jar
+    - org.eclipse.jetty-jetty-client-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-continuation-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-http-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-io-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-proxy-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-security-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-server-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-servlet-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-servlets-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-util-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-util-ajax-9.4.54.v20240208.jar
+    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.54.v20240208.jar
+    - org.eclipse.jetty.websocket-websocket-api-9.4.54.v20240208.jar
+    - org.eclipse.jetty.websocket-websocket-client-9.4.54.v20240208.jar
+    - org.eclipse.jetty.websocket-websocket-common-9.4.54.v20240208.jar
+    - org.eclipse.jetty.websocket-websocket-server-9.4.54.v20240208.jar
+    - org.eclipse.jetty.websocket-websocket-servlet-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-alpn-conscrypt-server-9.4.54.v20240208.jar
+    - org.eclipse.jetty-jetty-alpn-server-9.4.54.v20240208.jar
  * SnakeYaml -- org.yaml-snakeyaml-2.0.jar
  * RocksDB - org.rocksdb-rocksdbjni-7.9.2.jar
  * Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.5.1.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index 31e0e884caa..5d880a018c5 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -395,14 +395,14 @@ The Apache Software License, Version 2.0
     - async-http-client-2.12.1.jar
     - async-http-client-netty-utils-2.12.1.jar
  * Jetty
-    - jetty-client-9.4.53.v20231009.jar
-    - jetty-http-9.4.53.v20231009.jar
-    - jetty-io-9.4.53.v20231009.jar
-    - jetty-util-9.4.53.v20231009.jar
-    - javax-websocket-client-impl-9.4.53.v20231009.jar
-    - websocket-api-9.4.53.v20231009.jar
-    - websocket-client-9.4.53.v20231009.jar
-    - websocket-common-9.4.53.v20231009.jar
+    - jetty-client-9.4.54.v20240208.jar
+    - jetty-http-9.4.54.v20240208.jar
+    - jetty-io-9.4.54.v20240208.jar
+    - jetty-util-9.4.54.v20240208.jar
+    - javax-websocket-client-impl-9.4.54.v20240208.jar
+    - websocket-api-9.4.54.v20240208.jar
+    - websocket-client-9.4.54.v20240208.jar
+    - websocket-common-9.4.54.v20240208.jar
  * SnakeYaml -- snakeyaml-2.0.jar
  * Google Error Prone Annotations - error_prone_annotations-2.5.1.jar
  * Javassist -- javassist-3.25.0-GA.jar
diff --git a/pom.xml b/pom.xml
index 316eadfc839..c24e57fe1cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,7 +142,7 @@ flexible messaging model and an intuitive client API.</description>
     <curator.version>5.1.0</curator.version>
     <netty.version>4.1.100.Final</netty.version>
     <netty-iouring.version>0.0.21.Final</netty-iouring.version>
-    <jetty.version>9.4.53.v20231009</jetty.version>
+    <jetty.version>9.4.54.v20240208</jetty.version>
     <conscrypt.version>2.5.2</conscrypt.version>
     <jersey.version>2.34</jersey.version>
     <athenz.version>1.10.50</athenz.version>


(pulsar) 02/04: [fix][txn]Fix TopicTransactionBuffer potential thread safety issue (#22149)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c6b4887124bb5dee9edc7a85448dc341c3ff41a6
Author: 道君 <da...@apache.org>
AuthorDate: Thu Feb 29 21:22:03 2024 +0800

    [fix][txn]Fix TopicTransactionBuffer potential thread safety issue (#22149)
    
    (cherry picked from commit 74be3fd4917a2327f2da9b5b55cc572b3c1f4e84)
---
 .../buffer/impl/TopicTransactionBuffer.java        | 30 ++++++++++++----------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 5392e473947..a36216bd625 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -170,13 +170,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                         if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                             TxnID txnID = new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
                             PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
-                            if (Markers.isTxnMarker(msgMetadata)) {
-                                if (Markers.isTxnAbortMarker(msgMetadata)) {
-                                    snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
+                            synchronized (TopicTransactionBuffer.this) {
+                                if (Markers.isTxnMarker(msgMetadata)) {
+                                    if (Markers.isTxnAbortMarker(msgMetadata)) {
+                                        snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
+                                    }
+                                    updateMaxReadPosition(txnID);
+                                } else {
+                                    handleTransactionMessage(txnID, position);
                                 }
-                                updateMaxReadPosition(txnID);
-                            } else {
-                                handleTransactionMessage(txnID, position);
                             }
                         }
                     }
@@ -362,10 +364,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                             updateMaxReadPosition(txnID);
                             snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                             takeSnapshotByChangeTimes();
+                            txnAbortedCounter.increment();
+                            completableFuture.complete(null);
+                            handleLowWaterMark(txnID, lowWaterMark);
                         }
-                        txnAbortedCounter.increment();
-                        completableFuture.complete(null);
-                        handleLowWaterMark(txnID, lowWaterMark);
                     }
 
                     @Override
@@ -473,7 +475,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     }
 
     @Override
-    public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
+    public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
@@ -510,9 +512,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     @Override
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
         TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats();
-        transactionInBufferStats.aborted = isTxnAborted(txnID, null);
-        if (ongoingTxns.containsKey(txnID)) {
-            transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString();
+        synchronized (this) {
+            transactionInBufferStats.aborted = isTxnAborted(txnID, null);
+            if (ongoingTxns.containsKey(txnID)) {
+                transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString();
+            }
         }
         return transactionInBufferStats;
     }


(pulsar) 04/04: [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 62de4a5639becfb2e0076c86ab569bdb8d03fbcb
Author: 道君 <da...@apache.org>
AuthorDate: Thu Feb 29 21:03:47 2024 +0800

    [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
    
    (cherry picked from commit e25c7f045753b949c5ecd492bd7b7a77440c6937)
---
 .../impl/FileStoreBackedReadHandleImpl.java        | 36 +++++++++++++++++-----
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 32 +++++++++++--------
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 14 +++++++--
 3 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 49b2071f5db..91e7e902eab 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -36,6 +37,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
     private final LedgerOffloaderStats offloaderStats;
     private final String managedLedgerName;
     private final String topicName;
+    enum State {
+        Opened,
+        Closed
+    }
+    private volatile State state;
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
                                           LedgerOffloaderStats offloaderStats,
@@ -72,6 +80,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
             offloaderStats.recordReadOffloadIndexLatency(topicName,
                     System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
             this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
+            state = State.Opened;
         } catch (IOException e) {
             log.error("Fail to read LedgerMetadata for ledgerId {}",
                     ledgerId);
@@ -92,15 +101,20 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+            return closeFuture.get();
+        }
+
+        CompletableFuture<Void> promise = closeFuture.get();
         executor.execute(() -> {
-                try {
-                    reader.close();
-                    promise.complete(null);
-                } catch (IOException t) {
-                    promise.completeExceptionally(t);
-                }
-            });
+            try {
+                reader.close();
+                state = State.Closed;
+                promise.complete(null);
+            } catch (IOException t) {
+                promise.completeExceptionally(t);
+            }
+        });
         return promise;
     }
 
@@ -111,6 +125,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
         }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
             if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 5a571bb208e..5346be6a044 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -66,13 +68,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
             .newBuilder()
             .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
             .build();
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     enum State {
         Opened,
         Closed
     }
 
-    private State state = null;
+    private volatile State state = null;
 
     private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
                                           BackedInputStream inputStream, ExecutorService executor) {
@@ -96,18 +99,22 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+            return closeFuture.get();
+        }
+
+        CompletableFuture<Void> promise = closeFuture.get();
         executor.execute(() -> {
-                try {
-                    index.close();
-                    inputStream.close();
-                    entryOffsets.invalidateAll();
-                    state = State.Closed;
-                    promise.complete(null);
-                } catch (IOException t) {
-                    promise.completeExceptionally(t);
-                }
-            });
+            try {
+                index.close();
+                inputStream.close();
+                entryOffsets.invalidateAll();
+                state = State.Closed;
+                promise.complete(null);
+            } catch (IOException t) {
+                promise.completeExceptionally(t);
+            }
+        });
         return promise;
     }
 
@@ -298,6 +305,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     }
 
     // for testing
+    @VisibleForTesting
     State getState() {
         return this.state;
     }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index e40a0a3834c..53d96e08abf 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.val;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
     private final List<BackedInputStream> inputStreams;
     private final List<DataInputStream> dataStreams;
     private final ExecutorService executor;
-    private State state = null;
+    private volatile State state = null;
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     enum State {
         Opened,
@@ -123,7 +125,11 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+            return closeFuture.get();
+        }
+
+        CompletableFuture<Void> promise = closeFuture.get();
         executor.execute(() -> {
             try {
                 for (OffloadIndexBlockV2 indexBlock : indices) {
@@ -143,7 +149,9 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
 
     @Override
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
-        log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        if (log.isDebugEnabled()) {
+            log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.execute(() -> {
             if (state == State.Closed) {