You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:02 UTC

[pulsar] 16/38: Avoid prefetch too much data when offloading data to HDFS (#6717)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 74c668a35130b85cf67256ab10466dbc06f99e50
Author: pheecian <ph...@gmail.com>
AuthorDate: Wed Apr 22 17:06:22 2020 +0800

    Avoid prefetch too much data when offloading data to HDFS (#6717)
    
    Fixes #6692
    
    ### Motivation
    avoid prefetch too much data when offloading, which may lead to OOM;
    fix object not close issue, which is also mentioned by congbobo184 https://github.com/apache/pulsar/pull/6697
    
    *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (no)
    (cherry picked from commit 514b6af7586633424739cfc3c6131b0d0afec9e4)
---
 conf/broker.conf                                   |  3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../common/policies/data/OffloadPolicies.java      |  5 ++
 .../impl/FileSystemManagedLedgerOffloader.java     | 73 +++++++++++++++++-----
 4 files changed, 70 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 83d009d..b1e6c1a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -818,6 +818,9 @@ managedLedgerOffloadDriver=
 # Maximum number of thread pool threads for ledger offloading
 managedLedgerOffloadMaxThreads=2
 
+# Maximum prefetch rounds for ledger reading for offloading
+managedLedgerOffloadPrefetchRounds=1
+
 # Use Open Range-Set to cache unacked messages
 managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d7d0d63..6ad8b46 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1431,6 +1431,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int managedLedgerOffloadMaxThreads = 2;
 
+    @FieldContext(
+            category = CATEGORY_STORAGE_OFFLOADING,
+            doc = "Maximum prefetch rounds for ledger reading for offloading"
+    )
+    private int managedLedgerOffloadPrefetchRounds = 1;
+
     /**** --- Transaction config variables --- ****/
     @FieldContext(
             category = CATEGORY_TRANSACTION,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index 5ccb75c..4936923 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -37,6 +37,7 @@ public class OffloadPolicies {
     public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024;   // 64MB
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;      // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
+    public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
     public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem"};
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = -1;
@@ -46,6 +47,7 @@ public class OffloadPolicies {
     private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY;
     private String managedLedgerOffloadDriver = null;
     private int managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
+    private int managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
     private long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
     private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
 
@@ -161,6 +163,7 @@ public class OffloadPolicies {
         return Objects.hash(
                 managedLedgerOffloadDriver,
                 managedLedgerOffloadMaxThreads,
+                managedLedgerOffloadPrefetchRounds,
                 managedLedgerOffloadThresholdInBytes,
                 managedLedgerOffloadDeletionLagInMillis,
                 s3ManagedLedgerOffloadRegion,
@@ -190,6 +193,7 @@ public class OffloadPolicies {
         OffloadPolicies other = (OffloadPolicies) obj;
         return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
                 && Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
+                && Objects.equals(managedLedgerOffloadPrefetchRounds, other.getManagedLedgerOffloadPrefetchRounds())
                 && Objects.equals(managedLedgerOffloadThresholdInBytes,
                     other.getManagedLedgerOffloadThresholdInBytes())
                 && Objects.equals(managedLedgerOffloadDeletionLagInMillis,
@@ -222,6 +226,7 @@ public class OffloadPolicies {
         return MoreObjects.toStringHelper(this)
                 .add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
                 .add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
+                .add("managedLedgerOffloadPrefetchRounds", managedLedgerOffloadPrefetchRounds)
                 .add("managedLedgerOffloadThresholdInBytes", managedLedgerOffloadThresholdInBytes)
                 .add("managedLedgerOffloadDeletionLagInMillis", managedLedgerOffloadDeletionLagInMillis)
                 .add("s3ManagedLedgerOffloadRegion", s3ManagedLedgerOffloadRegion)
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index bbee828..5438459 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import io.netty.util.Recycler;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -45,6 +46,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
@@ -68,6 +70,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     public static boolean driverSupported(String driver) {
         return DRIVER_NAMES.equals(driver);
     }
+
     @Override
     public String getOffloadDriverName() {
         return driverName;
@@ -82,7 +85,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
         this.configuration = new Configuration();
         if (conf.getFileSystemProfilePath() != null) {
             String[] paths = conf.getFileSystemProfilePath().split(",");
-            for (int i =0 ; i < paths.length; i++) {
+            for (int i = 0; i < paths.length; i++) {
                 configuration.addResource(new Path(paths[i]));
             }
         }
@@ -106,6 +109,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                 .numThreads(conf.getManagedLedgerOffloadMaxThreads())
                 .name("offload-assignment").build();
     }
+
     @VisibleForTesting
     public FileSystemManagedLedgerOffloader(OffloadPolicies conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException {
         this.offloadPolicies = conf;
@@ -137,7 +141,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     @Override
     public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler));
+        scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
         return promise;
     }
 
@@ -151,9 +155,10 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
         private final Configuration configuration;
         volatile Exception fileSystemWriteException = null;
         private OrderedScheduler assignmentScheduler;
+        private int managedLedgerOffloadPrefetchRounds = 1;
 
         private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata, CompletableFuture<Void> promise,
-                             String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler) {
+                             String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, int managedLedgerOffloadPrefetchRounds) {
             this.readHandle = readHandle;
             this.uuid = uuid;
             this.extraMetadata = extraMetadata;
@@ -161,6 +166,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
             this.storageBasePath = storageBasePath;
             this.configuration = configuration;
             this.assignmentScheduler = assignmentScheduler;
+            this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
         }
 
         @Override
@@ -188,13 +194,17 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                 AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
                 long needToOffloadFirstEntryNumber = 0;
                 CountDownLatch countDownLatch;
+                //avoid prefetch too much data into memory
+                Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds);
                 do {
                     long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
                     log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
                     LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+                    semaphore.acquire();
                     countDownLatch = new CountDownLatch(1);
-                    assignmentScheduler.chooseThread(ledgerId).submit(new FileSystemWriter(ledgerEntriesOnce, dataWriter,
-                            countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {}, Executors.newSingleThreadExecutor());
+                    assignmentScheduler.chooseThread(ledgerId).submit(FileSystemWriter.create(ledgerEntriesOnce, dataWriter, semaphore,
+                            countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {
+                    }, Executors.newSingleThreadExecutor());
                     needToOffloadFirstEntryNumber = end + 1;
                 } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
                 countDownLatch.await();
@@ -216,24 +226,50 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
 
     private static class FileSystemWriter implements Runnable {
 
-        private final LedgerEntries ledgerEntriesOnce;
+        private LedgerEntries ledgerEntriesOnce;
 
         private final LongWritable key = new LongWritable();
         private final BytesWritable value = new BytesWritable();
 
-        private final MapFile.Writer dataWriter;
-        private final CountDownLatch countDownLatch;
-        private final AtomicLong haveOffloadEntryNumber;
-        private final LedgerReader ledgerReader;
+        private MapFile.Writer dataWriter;
+        private CountDownLatch countDownLatch;
+        private AtomicLong haveOffloadEntryNumber;
+        private LedgerReader ledgerReader;
+        private Semaphore semaphore;
+        private Recycler.Handle<FileSystemWriter> recyclerHandle;
+
+        private FileSystemWriter(Recycler.Handle<FileSystemWriter> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>() {
+            @Override
+            protected FileSystemWriter newObject(Recycler.Handle<FileSystemWriter> handle) {
+                return new FileSystemWriter(handle);
+            }
+        };
+
+        private void recycle() {
+            this.dataWriter = null;
+            this.countDownLatch = null;
+            this.haveOffloadEntryNumber = null;
+            this.ledgerReader = null;
+            this.ledgerEntriesOnce = null;
+            this.semaphore = null;
+            recyclerHandle.recycle(this);
+        }
 
 
-        private FileSystemWriter(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter,
-                                 CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
-            this.ledgerEntriesOnce = ledgerEntriesOnce;
-            this.dataWriter = dataWriter;
-            this.countDownLatch = countDownLatch;
-            this.haveOffloadEntryNumber = haveOffloadEntryNumber;
-            this.ledgerReader = ledgerReader;
+        public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter, Semaphore semaphore,
+                                              CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
+            FileSystemWriter writer = RECYCLER.get();
+            writer.ledgerReader = ledgerReader;
+            writer.dataWriter = dataWriter;
+            writer.countDownLatch = countDownLatch;
+            writer.haveOffloadEntryNumber = haveOffloadEntryNumber;
+            writer.ledgerEntriesOnce = ledgerEntriesOnce;
+            writer.semaphore = semaphore;
+            return writer;
         }
 
         @Override
@@ -255,6 +291,9 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                 }
             }
             countDownLatch.countDown();
+            ledgerEntriesOnce.close();
+            semaphore.release();
+            this.recycle();
         }
     }