You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:02 UTC
[pulsar] 16/38: Avoid prefetch too much data when offloading data
to HDFS (#6717)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 74c668a35130b85cf67256ab10466dbc06f99e50
Author: pheecian <ph...@gmail.com>
AuthorDate: Wed Apr 22 17:06:22 2020 +0800
Avoid prefetch too much data when offloading data to HDFS (#6717)
Fixes #6692
### Motivation
avoid prefetch too much data when offloading, which may lead to OOM;
fix object not close issue, which is also mentioned by congbobo184 https://github.com/apache/pulsar/pull/6697
*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
- Does this pull request introduce a new feature? (no)
(cherry picked from commit 514b6af7586633424739cfc3c6131b0d0afec9e4)
---
conf/broker.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++
.../common/policies/data/OffloadPolicies.java | 5 ++
.../impl/FileSystemManagedLedgerOffloader.java | 73 +++++++++++++++++-----
4 files changed, 70 insertions(+), 17 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 83d009d..b1e6c1a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -818,6 +818,9 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2
+# Maximum prefetch rounds for ledger reading for offloading
+managedLedgerOffloadPrefetchRounds=1
+
# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d7d0d63..6ad8b46 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1431,6 +1431,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerOffloadMaxThreads = 2;
+ @FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Maximum prefetch rounds for ledger reading for offloading"
+ )
+ private int managedLedgerOffloadPrefetchRounds = 1;
+
/**** --- Transaction config variables --- ****/
@FieldContext(
category = CATEGORY_TRANSACTION,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 5ccb75c..4936923 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -37,6 +37,7 @@ public class OffloadPolicies {
public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
+ public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public final static long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = -1;
@@ -46,6 +47,7 @@ public class OffloadPolicies {
private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
+ private int managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
private long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
@@ -161,6 +163,7 @@ public class OffloadPolicies {
return Objects.hash(
managedLedgerOffloadDriver,
managedLedgerOffloadMaxThreads,
+ managedLedgerOffloadPrefetchRounds,
managedLedgerOffloadThresholdInBytes,
managedLedgerOffloadDeletionLagInMillis,
s3ManagedLedgerOffloadRegion,
@@ -190,6 +193,7 @@ public class OffloadPolicies {
OffloadPolicies other = (OffloadPolicies) obj;
return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
&& Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
+ && Objects.equals(managedLedgerOffloadPrefetchRounds, other.getManagedLedgerOffloadPrefetchRounds())
&& Objects.equals(managedLedgerOffloadThresholdInBytes,
other.getManagedLedgerOffloadThresholdInBytes())
&& Objects.equals(managedLedgerOffloadDeletionLagInMillis,
@@ -222,6 +226,7 @@ public class OffloadPolicies {
return MoreObjects.toStringHelper(this)
.add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
.add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
+ .add("managedLedgerOffloadPrefetchRounds", managedLedgerOffloadPrefetchRounds)
.add("managedLedgerOffloadThresholdInBytes", managedLedgerOffloadThresholdInBytes)
.add("managedLedgerOffloadDeletionLagInMillis", managedLedgerOffloadDeletionLagInMillis)
.add("s3ManagedLedgerOffloadRegion", s3ManagedLedgerOffloadRegion)
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index bbee828..5438459 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import io.netty.util.Recycler;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -45,6 +46,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
@@ -68,6 +70,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
public static boolean driverSupported(String driver) {
return DRIVER_NAMES.equals(driver);
}
+
@Override
public String getOffloadDriverName() {
return driverName;
@@ -82,7 +85,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
this.configuration = new Configuration();
if (conf.getFileSystemProfilePath() != null) {
String[] paths = conf.getFileSystemProfilePath().split(",");
- for (int i =0 ; i < paths.length; i++) {
+ for (int i = 0; i < paths.length; i++) {
configuration.addResource(new Path(paths[i]));
}
}
@@ -106,6 +109,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offload-assignment").build();
}
+
@VisibleForTesting
public FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException {
this.offloadPolicies = conf;
@@ -137,7 +141,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler));
+ scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
return promise;
}
@@ -151,9 +155,10 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
private final Configuration configuration;
volatile Exception fileSystemWriteException = null;
private OrderedScheduler assignmentScheduler;
+ private int managedLedgerOffloadPrefetchRounds = 1;
private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata, CompletableFuture<Void> promise,
- String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler) {
+ String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, int managedLedgerOffloadPrefetchRounds) {
this.readHandle = readHandle;
this.uuid = uuid;
this.extraMetadata = extraMetadata;
@@ -161,6 +166,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
this.storageBasePath = storageBasePath;
this.configuration = configuration;
this.assignmentScheduler = assignmentScheduler;
+ this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
}
@Override
@@ -188,13 +194,17 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
long needToOffloadFirstEntryNumber = 0;
CountDownLatch countDownLatch;
+ //avoid prefetch too much data into memory
+ Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds);
do {
long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+ semaphore.acquire();
countDownLatch = new CountDownLatch(1);
- assignmentScheduler.chooseThread(ledgerId).submit(new FileSystemWriter(ledgerEntriesOnce, dataWriter,
- countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {}, Executors.newSingleThreadExecutor());
+ assignmentScheduler.chooseThread(ledgerId).submit(FileSystemWriter.create(ledgerEntriesOnce, dataWriter, semaphore,
+ countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {
+ }, Executors.newSingleThreadExecutor());
needToOffloadFirstEntryNumber = end + 1;
} while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
countDownLatch.await();
@@ -216,24 +226,50 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
private static class FileSystemWriter implements Runnable {
- private final LedgerEntries ledgerEntriesOnce;
+ private LedgerEntries ledgerEntriesOnce;
private final LongWritable key = new LongWritable();
private final BytesWritable value = new BytesWritable();
- private final MapFile.Writer dataWriter;
- private final CountDownLatch countDownLatch;
- private final AtomicLong haveOffloadEntryNumber;
- private final LedgerReader ledgerReader;
+ private MapFile.Writer dataWriter;
+ private CountDownLatch countDownLatch;
+ private AtomicLong haveOffloadEntryNumber;
+ private LedgerReader ledgerReader;
+ private Semaphore semaphore;
+ private Recycler.Handle<FileSystemWriter> recyclerHandle;
+
+ private FileSystemWriter(Recycler.Handle<FileSystemWriter> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>() {
+ @Override
+ protected FileSystemWriter newObject(Recycler.Handle<FileSystemWriter> handle) {
+ return new FileSystemWriter(handle);
+ }
+ };
+
+ private void recycle() {
+ this.dataWriter = null;
+ this.countDownLatch = null;
+ this.haveOffloadEntryNumber = null;
+ this.ledgerReader = null;
+ this.ledgerEntriesOnce = null;
+ this.semaphore = null;
+ recyclerHandle.recycle(this);
+ }
- private FileSystemWriter(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter,
- CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
- this.ledgerEntriesOnce = ledgerEntriesOnce;
- this.dataWriter = dataWriter;
- this.countDownLatch = countDownLatch;
- this.haveOffloadEntryNumber = haveOffloadEntryNumber;
- this.ledgerReader = ledgerReader;
+ public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter, Semaphore semaphore,
+ CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
+ FileSystemWriter writer = RECYCLER.get();
+ writer.ledgerReader = ledgerReader;
+ writer.dataWriter = dataWriter;
+ writer.countDownLatch = countDownLatch;
+ writer.haveOffloadEntryNumber = haveOffloadEntryNumber;
+ writer.ledgerEntriesOnce = ledgerEntriesOnce;
+ writer.semaphore = semaphore;
+ return writer;
}
@Override
@@ -255,6 +291,9 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
}
}
countDownLatch.countDown();
+ ledgerEntriesOnce.close();
+ semaphore.release();
+ this.recycle();
}
}