You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/13 04:56:57 UTC

(pulsar) branch branch-3.2 updated: [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 5551e1b22b4 [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
5551e1b22b4 is described below

commit 5551e1b22b483ee92a49de9f503c7165d5f3bd10
Author: 道君 <da...@apache.org>
AuthorDate: Thu Feb 29 21:03:47 2024 +0800

    [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
---
 .../impl/FileStoreBackedReadHandleImpl.java        | 36 +++++++++++++++++-----
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 32 +++++++++++--------
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 14 +++++++--
 3 files changed, 59 insertions(+), 23 deletions(-)

diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 49b2071f5db..91e7e902eab 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -36,6 +37,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
     private final LedgerOffloaderStats offloaderStats;
     private final String managedLedgerName;
     private final String topicName;
+    enum State {
+        Opened,
+        Closed
+    }
+    private volatile State state;
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
                                           LedgerOffloaderStats offloaderStats,
@@ -72,6 +80,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
             offloaderStats.recordReadOffloadIndexLatency(topicName,
                     System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
             this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
+            state = State.Opened;
         } catch (IOException e) {
             log.error("Fail to read LedgerMetadata for ledgerId {}",
                     ledgerId);
@@ -92,15 +101,20 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+            return closeFuture.get();
+        }
+
+        CompletableFuture<Void> promise = closeFuture.get();
         executor.execute(() -> {
-                try {
-                    reader.close();
-                    promise.complete(null);
-                } catch (IOException t) {
-                    promise.completeExceptionally(t);
-                }
-            });
+            try {
+                reader.close();
+                state = State.Closed;
+                promise.complete(null);
+            } catch (IOException t) {
+                promise.completeExceptionally(t);
+            }
+        });
         return promise;
     }
 
@@ -111,6 +125,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
         }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
             if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 5a571bb208e..5346be6a044 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -66,13 +68,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
             .newBuilder()
             .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
             .build();
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     enum State {
         Opened,
         Closed
     }
 
-    private State state = null;
+    private volatile State state = null;
 
     private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
                                           BackedInputStream inputStream, ExecutorService executor) {
@@ -96,18 +99,22 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+            return closeFuture.get();
+        }
+
+        CompletableFuture<Void> promise = closeFuture.get();
         executor.execute(() -> {
-                try {
-                    index.close();
-                    inputStream.close();
-                    entryOffsets.invalidateAll();
-                    state = State.Closed;
-                    promise.complete(null);
-                } catch (IOException t) {
-                    promise.completeExceptionally(t);
-                }
-            });
+            try {
+                index.close();
+                inputStream.close();
+                entryOffsets.invalidateAll();
+                state = State.Closed;
+                promise.complete(null);
+            } catch (IOException t) {
+                promise.completeExceptionally(t);
+            }
+        });
         return promise;
     }
 
@@ -298,6 +305,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     }
 
     // for testing
+    @VisibleForTesting
     State getState() {
         return this.state;
     }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index e40a0a3834c..53d96e08abf 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.val;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
     private final List<BackedInputStream> inputStreams;
     private final List<DataInputStream> dataStreams;
     private final ExecutorService executor;
-    private State state = null;
+    private volatile State state = null;
+    private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
 
     enum State {
         Opened,
@@ -123,7 +125,11 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+            return closeFuture.get();
+        }
+
+        CompletableFuture<Void> promise = closeFuture.get();
         executor.execute(() -> {
             try {
                 for (OffloadIndexBlockV2 indexBlock : indices) {
@@ -143,7 +149,9 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
 
     @Override
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
-        log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        if (log.isDebugEnabled()) {
+            log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.execute(() -> {
             if (state == State.Closed) {