You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/13 04:56:57 UTC
(pulsar) branch branch-3.2 updated: [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5551e1b22b4 [fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
5551e1b22b4 is described below
commit 5551e1b22b483ee92a49de9f503c7165d5f3bd10
Author: 道君 <da...@apache.org>
AuthorDate: Thu Feb 29 21:03:47 2024 +0800
[fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
---
.../impl/FileStoreBackedReadHandleImpl.java | 36 +++++++++++++++++-----
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 32 +++++++++++--------
.../impl/BlobStoreBackedReadHandleImplV2.java | 14 +++++++--
3 files changed, 59 insertions(+), 23 deletions(-)
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index 49b2071f5db..91e7e902eab 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -36,6 +37,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
@@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
private final LedgerOffloaderStats offloaderStats;
private final String managedLedgerName;
private final String topicName;
+ enum State {
+ Opened,
+ Closed
+ }
+ private volatile State state;
+ private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
LedgerOffloaderStats offloaderStats,
@@ -72,6 +80,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
offloaderStats.recordReadOffloadIndexLatency(topicName,
System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
+ state = State.Opened;
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
ledgerId);
@@ -92,15 +101,20 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
@Override
public CompletableFuture<Void> closeAsync() {
- CompletableFuture<Void> promise = new CompletableFuture<>();
+ if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ return closeFuture.get();
+ }
+
+ CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
- try {
- reader.close();
- promise.complete(null);
- } catch (IOException t) {
- promise.completeExceptionally(t);
- }
- });
+ try {
+ reader.close();
+ state = State.Closed;
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
return promise;
}
@@ -111,6 +125,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+ ledgerId, firstEntry, lastEntry);
+ promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+ return;
+ }
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 5a571bb208e..5346be6a044 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -66,13 +68,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
.newBuilder()
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
.build();
+ private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
enum State {
Opened,
Closed
}
- private State state = null;
+ private volatile State state = null;
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream, ExecutorService executor) {
@@ -96,18 +99,22 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
@Override
public CompletableFuture<Void> closeAsync() {
- CompletableFuture<Void> promise = new CompletableFuture<>();
+ if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ return closeFuture.get();
+ }
+
+ CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
- try {
- index.close();
- inputStream.close();
- entryOffsets.invalidateAll();
- state = State.Closed;
- promise.complete(null);
- } catch (IOException t) {
- promise.completeExceptionally(t);
- }
- });
+ try {
+ index.close();
+ inputStream.close();
+ entryOffsets.invalidateAll();
+ state = State.Closed;
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
return promise;
}
@@ -298,6 +305,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
}
// for testing
+ @VisibleForTesting
State getState() {
return this.state;
}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index e40a0a3834c..53d96e08abf 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.val;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
private final List<BackedInputStream> inputStreams;
private final List<DataInputStream> dataStreams;
private final ExecutorService executor;
- private State state = null;
+ private volatile State state = null;
+ private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
enum State {
Opened,
@@ -123,7 +125,11 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
@Override
public CompletableFuture<Void> closeAsync() {
- CompletableFuture<Void> promise = new CompletableFuture<>();
+ if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
+ return closeFuture.get();
+ }
+
+ CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
try {
for (OffloadIndexBlockV2 indexBlock : indices) {
@@ -143,7 +149,9 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
- log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+ }
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
if (state == State.Closed) {