You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/31 01:09:55 UTC

[pulsar] branch branch-2.7 updated: Configurable data source for offloaded messages (#8717)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new efff0ee  Configurable data source for offloaded messages (#8717)
efff0ee is described below

commit efff0ee86cb1e6b581cbbb4765061bfe0ba3e3ce
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Fri Jan 8 13:01:17 2021 +0800

    Configurable data source for offloaded messages (#8717)
    
    Fix issue: https://github.com/apache/pulsar/issues/8591
    
    This PR include:
    * API change in command tools
    * Related implementation with tests
    * Related docs in cookbook
    
    By the way  log4j dependency is removed for module `managed-ledger` because now the whole project use log4j2 as the default logger framework.
    
    (cherry picked from commit 7c09f5ce649edcca0be792198d97573197c5a272)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  14 ++-
 managed-ledger/src/main/proto/MLDataFormats.proto  |   4 +-
 .../mledger/impl/OffloadPrefixReadTest.java        | 125 ++++++++++++++++++---
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  10 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  26 +++--
 .../pulsar/broker/admin/AdminApiOffloadTest.java   |   8 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  13 ++-
 .../common/naming/ServiceConfigurationTest.java    |   3 +-
 .../configurations/pulsar_broker_test.conf         |   5 +-
 pulsar-broker/src/test/resources/logback.xml       |   2 +-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  10 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  40 +++++--
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  28 ++++-
 .../common/policies/data/OffloadPolicies.java      |  91 ++++++++++++---
 .../common/policies/data/OffloadPoliciesTest.java  |   9 +-
 site2/docs/cookbooks-tiered-storage.md             |  10 ++
 16 files changed, 313 insertions(+), 85 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1f37e86..8dc3ea6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -119,6 +119,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.Stat;
 import org.slf4j.Logger;
@@ -1554,7 +1555,18 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
             LedgerInfo info = ledgers.get(ledgerId);
             CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
-            if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
+
+            if (config.getLedgerOffloader() != null
+                    && config.getLedgerOffloader().getOffloadPolicies() != null
+                    && config.getLedgerOffloader().getOffloadPolicies()
+                    .getManagedLedgerOffloadedReadPriority() == OffloadedReadPriority.BOOKKEEPER_FIRST
+                    && info != null && info.hasOffloadContext()
+                    && !info.getOffloadContext().getBookkeeperDeleted()) {
+                openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
+                        .withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+
+            } else if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
+
                 UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
                 // TODO: improve this to load ledger offloader by driver name recorded in metadata
                 Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 8b1ecbf8..b7dc580 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -48,8 +48,8 @@ message ManagedLedgerInfo {
     	optional int64 timestamp = 4;
         optional OffloadContext offloadContext = 5;
     }
-    
-    repeated LedgerInfo ledgerInfo = 1;
+
+  repeated LedgerInfo ledgerInfo = 1;
 
     // If present, it signals the managed ledger has been
     // terminated and this was the position of the last
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index d3d24b2..69011cac 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -18,19 +18,20 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-
+import static org.testng.Assert.assertEquals;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-
 import io.netty.buffer.ByteBuf;
-
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -39,7 +40,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -52,10 +52,12 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.util.MockClock;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -69,53 +71,140 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         config.setRetentionTime(10, TimeUnit.MINUTES);
         config.setRetentionSizeInMB(10);
         config.setLedgerOffloader(offloader);
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
 
         for (int i = 0; i < 25; i++) {
             String content = "entry-" + i;
             ledger.addEntry(content.getBytes());
         }
-        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
 
         ledger.offloadPrefix(ledger.getLastConfirmedEntry());
 
-        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
-        Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
-                            .filter(e -> e.getOffloadContext().getComplete()).count(), 2);
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
         Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
         Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+        Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
 
         UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
-                                        ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
+                ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
         UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
-                                         ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
+                ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
 
         ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
         int i = 0;
         for (Entry e : cursor.readEntries(10)) {
-            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+            assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(1))
             .readOffloaded(anyLong(), any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());
 
         for (Entry e : cursor.readEntries(10)) {
-            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+            assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(2))
-            .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
 
         for (Entry e : cursor.readEntries(5)) {
-            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+            assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(2))
-            .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), any(), anyMap());
+    }
+
+    @Test
+    public void testBookkeeperFirstOffloadRead() throws Exception {
+        MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
+        MockClock clock = new MockClock();
+        offloader.getOffloadPolicies()
+                .setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
+        //delete after 5 minutes
+        offloader.getOffloadPolicies()
+                .setManagedLedgerOffloadDeletionLagInMillis(300000L);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        config.setLedgerOffloader(offloader);
+        config.setClock(clock);
+
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_bookkeeper_first_test_ledger", config);
+
+        for (int i = 0; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+        assertEquals(ledger.getLedgersInfoAsList().stream()
+                .filter(e -> e.getOffloadContext().getComplete()).count(), 2);
+
+        LedgerInfo firstLedger = ledger.getLedgersInfoAsList().get(0);
+        Assert.assertTrue(firstLedger.getOffloadContext().getComplete());
+        LedgerInfo secondLedger;
+        secondLedger = ledger.getLedgersInfoAsList().get(1);
+        Assert.assertTrue(secondLedger.getOffloadContext().getComplete());
+
+        UUID firstLedgerUUID = new UUID(firstLedger.getOffloadContext().getUidMsb(),
+                firstLedger.getOffloadContext().getUidLsb());
+        UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(),
+                secondLedger.getOffloadContext().getUidLsb());
+
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        int i = 0;
+        for (Entry e : cursor.readEntries(10)) {
+            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+        }
+        // For offloaded first and not deleted ledgers, they should be read from bookkeeper.
+        verify(offloader, never())
+                .readOffloaded(anyLong(), any(), anyMap());
+
+        // Delete offladed message from bookkeeper
+        assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId()));
+        assertEventuallyTrue(() -> bkc.getLedgers().contains(secondLedger.getLedgerId()));
+        clock.advance(6, TimeUnit.MINUTES);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        // assert bk ledger is deleted
+        assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedger.getLedgerId()));
+        assertEventuallyTrue(() -> !bkc.getLedgers().contains(secondLedger.getLedgerId()));
+        Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
+        Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());
+
+        for (Entry e : cursor.readEntries(10)) {
+            Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+        }
+
+        // Ledgers deleted from bookkeeper, now should read from offloader
+        verify(offloader, atLeastOnce())
+                .readOffloaded(anyLong(), any(), anyMap());
+        verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
+
     }
 
+
     static class MockLedgerOffloader implements LedgerOffloader {
         ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>();
 
+
+        OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
+                null, null,
+                OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+                OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
+                OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+                OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
+
+
         @Override
         public String getOffloadDriverName() {
             return "mock";
@@ -150,7 +239,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
 
         @Override
         public OffloadPolicies getOffloadPolicies() {
-            return null;
+            return offloadPolicies;
         }
 
         @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 81080db..740a4ab 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -23,22 +23,19 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.ImmutableSet;
-
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
@@ -51,12 +48,10 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.zookeeper.MockZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.testng.annotations.Test;
 
 public class OffloadPrefixTest extends MockedBookKeeperTestCase {
@@ -1002,7 +997,8 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
-                OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);
+                OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+                OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
 
         @Override
         public String getOffloadDriverName() {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dab89f7..584d5a8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -21,9 +21,7 @@ package org.apache.pulsar.broker;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import io.netty.util.internal.PlatformDependent;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -34,14 +32,15 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
-import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.sasl.SaslConstants;
 
 /**
@@ -1433,17 +1432,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
                     + "Of course, this may degrade consumption throughput. Default is 10ms.")
     private int managedLedgerNewEntriesCheckDelayInMillis = 10;
 
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.")
+    private String managedLedgerDataReadPriority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST
+            .getValue();
+
     /*** --- Load balancer --- ****/
     @FieldContext(
-        category = CATEGORY_LOAD_BALANCER,
-        doc = "Enable load balancer"
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Enable load balancer"
     )
     private boolean loadBalancerEnabled = true;
     @Deprecated
     @FieldContext(
-        category = CATEGORY_LOAD_BALANCER,
-        deprecated = true,
-        doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
+            category = CATEGORY_LOAD_BALANCER,
+            deprecated = true,
+            doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
     )
     private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 7aa0279..6644d8c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -27,14 +27,11 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-
 import com.google.common.collect.Sets;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -165,10 +162,11 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         String endpoint = "test-endpoint";
         long offloadThresholdInBytes = 0;
         long offloadDeletionLagInMillis = 100L;
+        OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;
 
         OffloadPolicies offload1 = OffloadPolicies.create(
                 driver, region, bucket, endpoint, null, null,
-                100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
+                100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority);
         admin.namespaces().setOffloadPolicies(namespaceName, offload1);
         OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
         assertEquals(offload1, offload2);
@@ -214,7 +212,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
         Thread.sleep(2000);
         testOffload(true);
     }
-    
+
     @Test
     public void testTopicLevelOffloadNonPartitioned() throws Exception {
         //wait for cache init
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index d51911a..a651ddc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
@@ -1273,7 +1272,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
-                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+                OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
         assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
                 new Long(-1));
@@ -1288,7 +1288,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
-                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+                OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
         assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
                 new Long(100));
@@ -1302,7 +1303,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
-                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+                OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
         assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
                 new Long(-2));
@@ -1316,7 +1318,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
                 OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
                 admin.namespaces().getOffloadThreshold(namespace),
-                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+                pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+                OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
         ledgerConf.setLedgerOffloader(offloader);
         assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
                 new Long(-1));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 258c1234..bc9e4bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,7 +30,6 @@ import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 import java.util.Properties;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -64,6 +62,7 @@ public class ServiceConfigurationTest {
         assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(), "topic_count_equally_divide");
         assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1);
         assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
+        assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
     }
 
     @Test
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 08f7a9c..a21d92d 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -74,8 +74,9 @@ managedLedgerMaxEntriesPerLedger=50000
 managedLedgerMinLedgerRolloverTimeMinutes=10
 managedLedgerMaxLedgerRolloverTimeMinutes=240
 managedLedgerCursorMaxEntriesPerLedger=50000
-managedLedgerCursorRolloverTimeInSeconds=14400
-loadBalancerEnabled=false
+managedLedgerCursorRolloverTimeInSeconds = 14400
+managedLedgerDataReadPriority = bookkeeper-first
+loadBalancerEnabled = false
 loadBalancerReportUpdateThresholdPercentage=10
 loadBalancerReportUpdateMaxIntervalMinutes=15
 loadBalancerHostUsageCheckIntervalMinutes=1
diff --git a/pulsar-broker/src/test/resources/logback.xml b/pulsar-broker/src/test/resources/logback.xml
index bf3daa8..f5735b0 100644
--- a/pulsar-broker/src/test/resources/logback.xml
+++ b/pulsar-broker/src/test/resources/logback.xml
@@ -19,7 +19,7 @@
 
 -->
 <configuration scan="true">
-<!-- 
+<!--
 	<logger name="org.apache" level="IN" />
 	<logger name="org.apache.bookkeeper.mledger" level="ERROR" />
  -->
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 55dfdfd..35ef40b 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -550,11 +550,12 @@ public class PulsarAdminToolTest {
         namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1"));
         verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
 
-        namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s"));
+        namespaces.run(split(
+                "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first"));
         verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
                 OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
                         "http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
-                        10 * 1024 * 1024L, 10000L));
+                        10 * 1024 * 1024L, 10000L, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
 
         namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
         verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
@@ -774,9 +775,10 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
 
-        cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10"));
+        cmdTopics.run(split(
+                "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
         OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
-                , "endpoint", null, null, 8, 9, 10L, null);
+                , "endpoint", null, null, 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
         verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);
 
         cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 418d255..0da4efc 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -23,15 +23,15 @@ import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.CommaParameterSplitter;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.admin.cli.utils.IOUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -42,11 +42,12 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1662,7 +1663,7 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(
                 names = {"--driver", "-d"},
                 description = "Driver to use to offload old data to long term storage, " +
-                        "(Possible values: S3, aws-s3, google-cloud-storage)",
+                        "(Possible values: S3, aws-s3, google-cloud-storage, filesystem, azureblob)",
                 required = true)
         private String driver;
 
@@ -1722,17 +1723,24 @@ public class CmdNamespaces extends CmdBase {
                 required = false)
         private String offloadAfterThresholdStr;
 
-        private final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"};
+        @Parameter(
+                names = {"--offloadedReadPriority", "-orp"},
+                description = "read priority for offloaded messages",
+                required = false
+        )
+        private String offloadReadPriorityStr;
+
+        public final ImmutableList<String> DRIVER_NAMES = OffloadPolicies.DRIVER_NAMES;
 
         public boolean driverSupported(String driver) {
-            return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver));
+            return DRIVER_NAMES.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
         }
 
         public boolean isS3Driver(String driver) {
             if (StringUtils.isEmpty(driver)) {
                 return false;
             }
-            return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1]);
+            return driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1));
         }
 
         public boolean positiveCheck(String paramName, long value) {
@@ -1756,7 +1764,7 @@ public class CmdNamespaces extends CmdBase {
             if (!driverSupported(driver)) {
                 throw new ParameterException(
                         "The driver " + driver + " is not supported, " +
-                                "(Possible values: S3, aws-s3, google-cloud-storage).");
+                                "(Possible values: " + String.join(",", DRIVER_NAMES) + ").");
             }
 
             if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
@@ -1800,10 +1808,24 @@ public class CmdNamespaces extends CmdBase {
                     offloadAfterThresholdInBytes = offloadAfterThreshold;
                 }
             }
+            OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+            if (this.offloadReadPriorityStr != null) {
+                try {
+                    offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+                } catch (Exception e) {
+                    throw new ParameterException("--offloadedReadPriority parameter must be one of " +
+                            Arrays.stream(OffloadedReadPriority.values())
+                                    .map(OffloadedReadPriority::toString)
+                                    .collect(Collectors.joining(","))
+                            + " but got: " + this.offloadReadPriorityStr, e);
+                }
+            }
 
             OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret,
                     maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
-                    offloadAfterElapsedInMillis);
+                    offloadAfterElapsedInMillis, offloadedReadPriority);
+
             admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
         }
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b1c3cd2..a14d83f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -39,7 +39,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1402,11 +1403,34 @@ public class CmdTopics extends CmdBase {
                 , description = "ManagedLedger offload deletion lag in bytes")
         private Long offloadDeletionLagInMillis;
 
+        @Parameter(
+                names = {"--offloadedReadPriority", "-orp"},
+                description = "read priority for offloaded messages",
+                required = false
+        )
+        private String offloadReadPriorityStr;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
+
+            OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+            if (this.offloadReadPriorityStr != null) {
+                try {
+                    offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+                } catch (Exception e) {
+                    throw new ParameterException("--offloadedReadPriority parameter must be one of " +
+                            Arrays.stream(OffloadedReadPriority.values())
+                                    .map(OffloadedReadPriority::toString)
+                                    .collect(Collectors.joining(","))
+                            + " but got: " + this.offloadReadPriorityStr, e);
+                }
+            }
+
             OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
-                    , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis);
+                    , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
+
             admin.topics().setOffloadPolicies(persistentTopic, offloadPolicies);
         }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index f07a424..bb78ec1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.common.util.FieldParser.value;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
@@ -33,9 +34,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
 
 /**
  * Definition of the offload policies.
@@ -44,9 +48,58 @@ import org.apache.commons.lang3.StringUtils;
 @Data
 public class OffloadPolicies implements Serializable {
 
+    @InterfaceAudience.Public
+    @InterfaceStability.Stable
+    public enum OffloadedReadPriority {
+        /**
+         * For offloaded messages, readers will try to read from bookkeeper at first,
+         * if messages not exist at bookkeeper then read from offloaded storage.
+         */
+        BOOKKEEPER_FIRST("bookkeeper-first"),
+        /**
+         * For offloaded messages, readers will try to read from offloaded storage first,
+         * even they are still exist in bookkeeper.
+         */
+        TIERED_STORAGE_FIRST("tiered-storage-first");
+
+        private final String value;
+
+        OffloadedReadPriority(String value) {
+            this.value = value;
+        }
+
+        public boolean equalsName(String otherName) {
+            return value.equals(otherName);
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        public static OffloadedReadPriority fromString(String str) {
+            for (OffloadedReadPriority value : OffloadedReadPriority.values()) {
+                if (value.value.equals(str)) {
+                    return value;
+                }
+            }
+
+            throw new IllegalArgumentException("--offloadedReadPriority parameter must be one of "
+                    + Arrays.stream(OffloadedReadPriority.values())
+                    .map(OffloadedReadPriority::toString)
+                    .collect(Collectors.joining(","))
+                    + " but got: " + str);
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
+
     private final static long serialVersionUID = 0L;
 
     private final static List<Field> CONFIGURATION_FIELDS;
+
     static {
         CONFIGURATION_FIELDS = new ArrayList<>();
         Class<OffloadPolicies> clazz = OffloadPolicies.class;
@@ -62,7 +115,8 @@ public class OffloadPolicies implements Serializable {
     public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;      // 1MB
     public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
     public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
-    public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"};
+    public final static ImmutableList<String> DRIVER_NAMES = ImmutableList
+            .of("S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob");
     public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
     public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
     public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
@@ -70,6 +124,7 @@ public class OffloadPolicies implements Serializable {
     public final static String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
             "managedLedgerOffloadAutoTriggerSizeThresholdBytes";
     public final static String DELETION_LAG_NAME_IN_CONF_FILE = "managedLedgerOffloadDeletionLagMs";
+    public final static OffloadedReadPriority DEFAULT_OFFLOADED_READ_PRIORITY = OffloadedReadPriority.TIERED_STORAGE_FIRST;
 
     // common config
     @Configuration
@@ -90,6 +145,9 @@ public class OffloadPolicies implements Serializable {
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
     private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+    @Configuration
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;
     // s3 config, set by service configuration or cli
     @Configuration
     @JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -161,7 +219,8 @@ public class OffloadPolicies implements Serializable {
     public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
                                          String credentialId, String credentialSecret,
                                          Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
-                                         Long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
+                                         Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
+                                         OffloadedReadPriority readPriority) {
         OffloadPolicies offloadPolicies = new OffloadPolicies();
         offloadPolicies.setManagedLedgerOffloadDriver(driver);
         offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
@@ -172,8 +231,9 @@ public class OffloadPolicies implements Serializable {
         offloadPolicies.setManagedLedgerOffloadServiceEndpoint(endpoint);
         offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
         offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
+        offloadPolicies.setManagedLedgerOffloadedReadPriority(readPriority);
 
-        if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
+        if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
             if (credentialId != null) {
                 offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
             }
@@ -185,7 +245,7 @@ public class OffloadPolicies implements Serializable {
             offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
             offloadPolicies.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
             offloadPolicies.setS3ManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
-        } else if (driver.equalsIgnoreCase(DRIVER_NAMES[2])) {
+        } else if (driver.equalsIgnoreCase(DRIVER_NAMES.get(2))) {
             offloadPolicies.setGcsManagedLedgerOffloadRegion(region);
             offloadPolicies.setGcsManagedLedgerOffloadBucket(bucket);
             offloadPolicies.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
@@ -228,7 +288,7 @@ public class OffloadPolicies implements Serializable {
     }
 
     public boolean driverSupported() {
-        return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver));
+        return DRIVER_NAMES.stream().anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver));
     }
 
     public static String getSupportedDriverNames() {
@@ -239,22 +299,22 @@ public class OffloadPolicies implements Serializable {
         if (managedLedgerOffloadDriver == null) {
             return false;
         }
-        return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[0])
-                || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[1]);
+        return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(0))
+                || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(1));
     }
 
     public boolean isGcsDriver() {
         if (managedLedgerOffloadDriver == null) {
             return false;
         }
-        return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[2]);
+        return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(2));
     }
 
     public boolean isFileSystemDriver() {
         if (managedLedgerOffloadDriver == null) {
             return false;
         }
-        return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[3]);
+        return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(3));
     }
 
     public boolean bucketValid() {
@@ -277,6 +337,7 @@ public class OffloadPolicies implements Serializable {
     @Override
     public int hashCode() {
         return Objects.hash(
+                managedLedgerOffloadedReadPriority,
                 managedLedgerOffloadDriver,
                 managedLedgerOffloadMaxThreads,
                 managedLedgerOffloadPrefetchRounds,
@@ -312,17 +373,18 @@ public class OffloadPolicies implements Serializable {
             return false;
         }
         OffloadPolicies other = (OffloadPolicies) obj;
-        return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
+        return Objects.equals(managedLedgerOffloadedReadPriority, other.getManagedLedgerOffloadedReadPriority())
+                && Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
                 && Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
                 && Objects.equals(managedLedgerOffloadPrefetchRounds, other.getManagedLedgerOffloadPrefetchRounds())
                 && Objects.equals(managedLedgerOffloadThresholdInBytes,
-                    other.getManagedLedgerOffloadThresholdInBytes())
+                other.getManagedLedgerOffloadThresholdInBytes())
                 && Objects.equals(managedLedgerOffloadDeletionLagInMillis,
-                    other.getManagedLedgerOffloadDeletionLagInMillis())
+                other.getManagedLedgerOffloadDeletionLagInMillis())
                 && Objects.equals(s3ManagedLedgerOffloadRegion, other.getS3ManagedLedgerOffloadRegion())
                 && Objects.equals(s3ManagedLedgerOffloadBucket, other.getS3ManagedLedgerOffloadBucket())
                 && Objects.equals(s3ManagedLedgerOffloadServiceEndpoint,
-                    other.getS3ManagedLedgerOffloadServiceEndpoint())
+                other.getS3ManagedLedgerOffloadServiceEndpoint())
                 && Objects.equals(s3ManagedLedgerOffloadMaxBlockSizeInBytes,
                     other.getS3ManagedLedgerOffloadMaxBlockSizeInBytes())
                 && Objects.equals(s3ManagedLedgerOffloadReadBufferSizeInBytes,
@@ -352,6 +414,7 @@ public class OffloadPolicies implements Serializable {
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
+                .add("managedLedgerOffloadedReadPriority", managedLedgerOffloadedReadPriority)
                 .add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
                 .add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
                 .add("managedLedgerOffloadPrefetchRounds", managedLedgerOffloadPrefetchRounds)
@@ -382,7 +445,7 @@ public class OffloadPolicies implements Serializable {
 
     public Properties toProperties() {
         Properties properties = new Properties();
-
+        setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority());
         setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory());
         setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver());
         setProperty(properties, "managedLedgerOffloadMaxThreads",
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index d87887d..a89409c 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.policies.data;
 
 import java.util.Properties;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -59,7 +60,8 @@ public class OffloadPoliciesTest {
                 maxBlockSizeInBytes,
                 readBufferSizeInBytes,
                 offloadThresholdInBytes,
-                offloadDeletionLagInMillis
+                offloadDeletionLagInMillis,
+                OffloadedReadPriority.TIERED_STORAGE_FIRST
         );
 
         Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), driver);
@@ -86,6 +88,7 @@ public class OffloadPoliciesTest {
         final Integer readBufferSizeInBytes = 2 * M;
         final Long offloadThresholdInBytes = 0L;
         final Long offloadDeletionLagInMillis = 5 * MIN;
+        final OffloadedReadPriority readPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST;
 
         OffloadPolicies offloadPolicies = OffloadPolicies.create(
                 driver,
@@ -97,7 +100,8 @@ public class OffloadPoliciesTest {
                 maxBlockSizeInBytes,
                 readBufferSizeInBytes,
                 offloadThresholdInBytes,
-                offloadDeletionLagInMillis
+                offloadDeletionLagInMillis,
+                readPriority
         );
 
         Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), driver);
@@ -107,6 +111,7 @@ public class OffloadPoliciesTest {
         Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadReadBufferSizeInBytes(), readBufferSizeInBytes);
         Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), offloadThresholdInBytes);
         Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), offloadDeletionLagInMillis);
+        Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority(), readPriority);
     }
 
     @Test
diff --git a/site2/docs/cookbooks-tiered-storage.md b/site2/docs/cookbooks-tiered-storage.md
index 6dd2803..0263ec2 100644
--- a/site2/docs/cookbooks-tiered-storage.md
+++ b/site2/docs/cookbooks-tiered-storage.md
@@ -262,6 +262,16 @@ $ bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-name
 
 > Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload will not until the current segment is full.
 
+## Configuring read priority for offloaded messages
+
+By default, once messages were offloaded to long term storage, brokers will read them from long term storage, but messages still exists in bookkeeper for a period depends on the administrator's configuration. For 
+messages exists in both bookkeeper and long term storage, if they are preferred to read from bookkeeper, you can use command to change this configuration.
+
+```bash
+# default value for -orp is tiered-storage-first
+$ bin/pulsar-admin namespaces set-offload-policies my-tenant/my-namespace -orp bookkeeper-first
+$ bin/pulsar-admin topics set-offload-policies my-tenant/my-namespace/topic1 -orp bookkeeper-first
+```     
 
 ## Triggering offload manually