You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/31 01:09:55 UTC
[pulsar] branch branch-2.7 updated: Configurable data source for
offloaded messages (#8717)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new efff0ee Configurable data source for offloaded messages (#8717)
efff0ee is described below
commit efff0ee86cb1e6b581cbbb4765061bfe0ba3e3ce
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Fri Jan 8 13:01:17 2021 +0800
Configurable data source for offloaded messages (#8717)
Fix issue: https://github.com/apache/pulsar/issues/8591
This PR include:
* API change in command tools
* Related implementation with tests
* Related docs in cookbook
By the way log4j dependency is removed for module `managed-ledger` because now the whole project use log4j2 as the default logger framework.
(cherry picked from commit 7c09f5ce649edcca0be792198d97573197c5a272)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 ++-
managed-ledger/src/main/proto/MLDataFormats.proto | 4 +-
.../mledger/impl/OffloadPrefixReadTest.java | 125 ++++++++++++++++++---
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 10 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 26 +++--
.../pulsar/broker/admin/AdminApiOffloadTest.java | 8 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 13 ++-
.../common/naming/ServiceConfigurationTest.java | 3 +-
.../configurations/pulsar_broker_test.conf | 5 +-
pulsar-broker/src/test/resources/logback.xml | 2 +-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 40 +++++--
.../org/apache/pulsar/admin/cli/CmdTopics.java | 28 ++++-
.../common/policies/data/OffloadPolicies.java | 91 ++++++++++++---
.../common/policies/data/OffloadPoliciesTest.java | 9 +-
site2/docs/cookbooks-tiered-storage.md | 10 ++
16 files changed, 313 insertions(+), 85 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1f37e86..8dc3ea6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -119,6 +119,7 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
@@ -1554,7 +1555,18 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
LedgerInfo info = ledgers.get(ledgerId);
CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
- if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
+
+ if (config.getLedgerOffloader() != null
+ && config.getLedgerOffloader().getOffloadPolicies() != null
+ && config.getLedgerOffloader().getOffloadPolicies()
+ .getManagedLedgerOffloadedReadPriority() == OffloadedReadPriority.BOOKKEEPER_FIRST
+ && info != null && info.hasOffloadContext()
+ && !info.getOffloadContext().getBookkeeperDeleted()) {
+ openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
+ .withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+
+ } else if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
+
UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
// TODO: improve this to load ledger offloader by driver name recorded in metadata
Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 8b1ecbf8..b7dc580 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -48,8 +48,8 @@ message ManagedLedgerInfo {
optional int64 timestamp = 4;
optional OffloadContext offloadContext = 5;
}
-
- repeated LedgerInfo ledgerInfo = 1;
+
+ repeated LedgerInfo ledgerInfo = 1;
// If present, it signals the managed ledger has been
// terminated and this was the position of the last
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index d3d24b2..69011cac 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -18,19 +18,20 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-
import io.netty.buffer.ByteBuf;
-
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -39,7 +40,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -52,10 +52,12 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -69,53 +71,140 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
- ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
for (int i = 0; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
ledger.offloadPrefix(ledger.getLastConfirmedEntry());
- Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
- Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
- .filter(e -> e.getOffloadContext().getComplete()).count(), 2);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+ Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
- ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
+ ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
- ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
+ ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
int i = 0;
for (Entry e : cursor.readEntries(10)) {
- Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
.readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());
for (Entry e : cursor.readEntries(10)) {
- Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
for (Entry e : cursor.readEntries(5)) {
- Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), any(), anyMap());
+ }
+
+ @Test
+ public void testBookkeeperFirstOffloadRead() throws Exception {
+ MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
+ MockClock clock = new MockClock();
+ offloader.getOffloadPolicies()
+ .setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
+ //delete after 5 minutes
+ offloader.getOffloadPolicies()
+ .setManagedLedgerOffloadDeletionLagInMillis(300000L);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(offloader);
+ config.setClock(clock);
+
+
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_bookkeeper_first_test_ledger", config);
+
+ for (int i = 0; i < 25; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ assertEquals(ledger.getLedgersInfoAsList().stream()
+ .filter(e -> e.getOffloadContext().getComplete()).count(), 2);
+
+ LedgerInfo firstLedger = ledger.getLedgersInfoAsList().get(0);
+ Assert.assertTrue(firstLedger.getOffloadContext().getComplete());
+ LedgerInfo secondLedger;
+ secondLedger = ledger.getLedgersInfoAsList().get(1);
+ Assert.assertTrue(secondLedger.getOffloadContext().getComplete());
+
+ UUID firstLedgerUUID = new UUID(firstLedger.getOffloadContext().getUidMsb(),
+ firstLedger.getOffloadContext().getUidLsb());
+ UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(),
+ secondLedger.getOffloadContext().getUidLsb());
+
+ ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+ int i = 0;
+ for (Entry e : cursor.readEntries(10)) {
+ Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+ // For offloaded first and not deleted ledgers, they should be read from bookkeeper.
+ verify(offloader, never())
+ .readOffloaded(anyLong(), any(), anyMap());
+
+ // Delete offladed message from bookkeeper
+ assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId()));
+ assertEventuallyTrue(() -> bkc.getLedgers().contains(secondLedger.getLedgerId()));
+ clock.advance(6, TimeUnit.MINUTES);
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ ledger.internalTrimConsumedLedgers(promise);
+ promise.join();
+
+ // assert bk ledger is deleted
+ assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedger.getLedgerId()));
+ assertEventuallyTrue(() -> !bkc.getLedgers().contains(secondLedger.getLedgerId()));
+ Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
+ Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());
+
+ for (Entry e : cursor.readEntries(10)) {
+ Assert.assertEquals(new String(e.getData()), "entry-" + i++);
+ }
+
+ // Ledgers deleted from bookkeeper, now should read from offloader
+ verify(offloader, atLeastOnce())
+ .readOffloaded(anyLong(), any(), anyMap());
+ verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
+
}
+
static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>();
+
+ OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
+ null, null,
+ OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
+ OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
+ OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
+
+
@Override
public String getOffloadDriverName() {
return "mock";
@@ -150,7 +239,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
@Override
public OffloadPolicies getOffloadPolicies() {
- return null;
+ return offloadPolicies;
}
@Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 81080db..740a4ab 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -23,22 +23,19 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.ImmutableSet;
-
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
@@ -51,12 +48,10 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.zookeeper.MockZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.testng.annotations.Test;
public class OffloadPrefixTest extends MockedBookKeeperTestCase {
@@ -1002,7 +997,8 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
- OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);
+ OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);
@Override
public String getOffloadDriverName() {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dab89f7..584d5a8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -21,9 +21,7 @@ package org.apache.pulsar.broker;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import io.netty.util.internal.PlatformDependent;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -34,14 +32,15 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
-import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
/**
@@ -1433,17 +1432,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "Of course, this may degrade consumption throughput. Default is 10ms.")
private int managedLedgerNewEntriesCheckDelayInMillis = 10;
+ @FieldContext(category = CATEGORY_STORAGE_ML,
+ doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.")
+ private String managedLedgerDataReadPriority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST
+ .getValue();
+
/*** --- Load balancer --- ****/
@FieldContext(
- category = CATEGORY_LOAD_BALANCER,
- doc = "Enable load balancer"
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Enable load balancer"
)
private boolean loadBalancerEnabled = true;
@Deprecated
@FieldContext(
- category = CATEGORY_LOAD_BALANCER,
- deprecated = true,
- doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
+ category = CATEGORY_LOAD_BALANCER,
+ deprecated = true,
+ doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
)
private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 7aa0279..6644d8c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -27,14 +27,11 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-
import com.google.common.collect.Sets;
-
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -165,10 +162,11 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
String endpoint = "test-endpoint";
long offloadThresholdInBytes = 0;
long offloadDeletionLagInMillis = 100L;
+ OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;
OffloadPolicies offload1 = OffloadPolicies.create(
driver, region, bucket, endpoint, null, null,
- 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
+ 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
@@ -214,7 +212,7 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
Thread.sleep(2000);
testOffload(true);
}
-
+
@Test
public void testTopicLevelOffloadNonPartitioned() throws Exception {
//wait for cache init
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index d51911a..a651ddc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -34,7 +34,6 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
@@ -1273,7 +1272,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
- pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(-1));
@@ -1288,7 +1288,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
- pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(100));
@@ -1302,7 +1303,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
- pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(-2));
@@ -1316,7 +1318,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
admin.namespaces().getOffloadThreshold(namespace),
- pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs()));
+ pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(),
+ OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY));
ledgerConf.setLedgerOffloader(offloader);
assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(),
new Long(-1));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 258c1234..bc9e4bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -31,7 +30,6 @@ import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.Properties;
-
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
@@ -64,6 +62,7 @@ public class ServiceConfigurationTest {
assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(), "topic_count_equally_divide");
assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1);
assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1);
+ assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first");
}
@Test
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 08f7a9c..a21d92d 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -74,8 +74,9 @@ managedLedgerMaxEntriesPerLedger=50000
managedLedgerMinLedgerRolloverTimeMinutes=10
managedLedgerMaxLedgerRolloverTimeMinutes=240
managedLedgerCursorMaxEntriesPerLedger=50000
-managedLedgerCursorRolloverTimeInSeconds=14400
-loadBalancerEnabled=false
+managedLedgerCursorRolloverTimeInSeconds = 14400
+managedLedgerDataReadPriority = bookkeeper-first
+loadBalancerEnabled = false
loadBalancerReportUpdateThresholdPercentage=10
loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1
diff --git a/pulsar-broker/src/test/resources/logback.xml b/pulsar-broker/src/test/resources/logback.xml
index bf3daa8..f5735b0 100644
--- a/pulsar-broker/src/test/resources/logback.xml
+++ b/pulsar-broker/src/test/resources/logback.xml
@@ -19,7 +19,7 @@
-->
<configuration scan="true">
-<!--
+<!--
<logger name="org.apache" level="IN" />
<logger name="org.apache.bookkeeper.mledger" level="ERROR" />
-->
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 55dfdfd..35ef40b 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -550,11 +550,12 @@ public class PulsarAdminToolTest {
namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1"));
verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1");
- namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s"));
+ namespaces.run(split(
+ "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first"));
verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1",
OffloadPolicies.create("aws-s3", "test-region", "test-bucket",
"http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024,
- 10 * 1024 * 1024L, 10000L));
+ 10 * 1024 * 1024L, 10000L, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST));
namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");
@@ -774,9 +775,10 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
- cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10"));
+ cmdTopics.run(split(
+ "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket"
- , "endpoint", null, null, 8, 9, 10L, null);
+ , "endpoint", null, null, 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST);
verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies);
cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 418d255..0da4efc 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -23,15 +23,15 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.CommaParameterSplitter;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.admin.cli.utils.IOUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -42,11 +42,12 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1662,7 +1663,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(
names = {"--driver", "-d"},
description = "Driver to use to offload old data to long term storage, " +
- "(Possible values: S3, aws-s3, google-cloud-storage)",
+ "(Possible values: S3, aws-s3, google-cloud-storage, filesystem, azureblob)",
required = true)
private String driver;
@@ -1722,17 +1723,24 @@ public class CmdNamespaces extends CmdBase {
required = false)
private String offloadAfterThresholdStr;
- private final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"};
+ @Parameter(
+ names = {"--offloadedReadPriority", "-orp"},
+ description = "read priority for offloaded messages",
+ required = false
+ )
+ private String offloadReadPriorityStr;
+
+ public final ImmutableList<String> DRIVER_NAMES = OffloadPolicies.DRIVER_NAMES;
public boolean driverSupported(String driver) {
- return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver));
+ return DRIVER_NAMES.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
}
public boolean isS3Driver(String driver) {
if (StringUtils.isEmpty(driver)) {
return false;
}
- return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1]);
+ return driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1));
}
public boolean positiveCheck(String paramName, long value) {
@@ -1756,7 +1764,7 @@ public class CmdNamespaces extends CmdBase {
if (!driverSupported(driver)) {
throw new ParameterException(
"The driver " + driver + " is not supported, " +
- "(Possible values: S3, aws-s3, google-cloud-storage).");
+ "(Possible values: " + String.join(",", DRIVER_NAMES) + ").");
}
if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
@@ -1800,10 +1808,24 @@ public class CmdNamespaces extends CmdBase {
offloadAfterThresholdInBytes = offloadAfterThreshold;
}
}
+ OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+ if (this.offloadReadPriorityStr != null) {
+ try {
+ offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+ } catch (Exception e) {
+ throw new ParameterException("--offloadedReadPriority parameter must be one of " +
+ Arrays.stream(OffloadedReadPriority.values())
+ .map(OffloadedReadPriority::toString)
+ .collect(Collectors.joining(","))
+ + " but got: " + this.offloadReadPriorityStr, e);
+ }
+ }
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret,
maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes,
- offloadAfterElapsedInMillis);
+ offloadAfterElapsedInMillis, offloadedReadPriority);
+
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b1c3cd2..a14d83f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -39,7 +39,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1402,11 +1403,34 @@ public class CmdTopics extends CmdBase {
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
+ @Parameter(
+ names = {"--offloadedReadPriority", "-orp"},
+ description = "read priority for offloaded messages",
+ required = false
+ )
+ private String offloadReadPriorityStr;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+
+ OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+ if (this.offloadReadPriorityStr != null) {
+ try {
+ offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+ } catch (Exception e) {
+ throw new ParameterException("--offloadedReadPriority parameter must be one of " +
+ Arrays.stream(OffloadedReadPriority.values())
+ .map(OffloadedReadPriority::toString)
+ .collect(Collectors.joining(","))
+ + " but got: " + this.offloadReadPriorityStr, e);
+ }
+ }
+
OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes
- , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis);
+ , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
+
admin.topics().setOffloadPolicies(persistentTopic, offloadPolicies);
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
index f07a424..bb78ec1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.common.util.FieldParser.value;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -33,9 +34,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
/**
* Definition of the offload policies.
@@ -44,9 +48,58 @@ import org.apache.commons.lang3.StringUtils;
@Data
public class OffloadPolicies implements Serializable {
+ @InterfaceAudience.Public
+ @InterfaceStability.Stable
+ public enum OffloadedReadPriority {
+ /**
+ * For offloaded messages, readers will try to read from bookkeeper at first,
+ * if messages not exist at bookkeeper then read from offloaded storage.
+ */
+ BOOKKEEPER_FIRST("bookkeeper-first"),
+ /**
+ * For offloaded messages, readers will try to read from offloaded storage first,
+ * even they are still exist in bookkeeper.
+ */
+ TIERED_STORAGE_FIRST("tiered-storage-first");
+
+ private final String value;
+
+ OffloadedReadPriority(String value) {
+ this.value = value;
+ }
+
+ public boolean equalsName(String otherName) {
+ return value.equals(otherName);
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public static OffloadedReadPriority fromString(String str) {
+ for (OffloadedReadPriority value : OffloadedReadPriority.values()) {
+ if (value.value.equals(str)) {
+ return value;
+ }
+ }
+
+ throw new IllegalArgumentException("--offloadedReadPriority parameter must be one of "
+ + Arrays.stream(OffloadedReadPriority.values())
+ .map(OffloadedReadPriority::toString)
+ .collect(Collectors.joining(","))
+ + " but got: " + str);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
private final static long serialVersionUID = 0L;
private final static List<Field> CONFIGURATION_FIELDS;
+
static {
CONFIGURATION_FIELDS = new ArrayList<>();
Class<OffloadPolicies> clazz = OffloadPolicies.class;
@@ -62,7 +115,8 @@ public class OffloadPolicies implements Serializable {
public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB
public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2;
public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
- public final static String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob"};
+ public final static ImmutableList<String> DRIVER_NAMES = ImmutableList
+ .of("S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob");
public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null;
public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null;
@@ -70,6 +124,7 @@ public class OffloadPolicies implements Serializable {
public final static String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE =
"managedLedgerOffloadAutoTriggerSizeThresholdBytes";
public final static String DELETION_LAG_NAME_IN_CONF_FILE = "managedLedgerOffloadDeletionLagMs";
+ public final static OffloadedReadPriority DEFAULT_OFFLOADED_READ_PRIORITY = OffloadedReadPriority.TIERED_STORAGE_FIRST;
// common config
@Configuration
@@ -90,6 +145,9 @@ public class OffloadPolicies implements Serializable {
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
+ @Configuration
+ @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+ private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY;
// s3 config, set by service configuration or cli
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
@@ -161,7 +219,8 @@ public class OffloadPolicies implements Serializable {
public static OffloadPolicies create(String driver, String region, String bucket, String endpoint,
String credentialId, String credentialSecret,
Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes,
- Long offloadThresholdInBytes, Long offloadDeletionLagInMillis) {
+ Long offloadThresholdInBytes, Long offloadDeletionLagInMillis,
+ OffloadedReadPriority readPriority) {
OffloadPolicies offloadPolicies = new OffloadPolicies();
offloadPolicies.setManagedLedgerOffloadDriver(driver);
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes);
@@ -172,8 +231,9 @@ public class OffloadPolicies implements Serializable {
offloadPolicies.setManagedLedgerOffloadServiceEndpoint(endpoint);
offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
+ offloadPolicies.setManagedLedgerOffloadedReadPriority(readPriority);
- if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) {
+ if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) {
if (credentialId != null) {
offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId);
}
@@ -185,7 +245,7 @@ public class OffloadPolicies implements Serializable {
offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint);
offloadPolicies.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
offloadPolicies.setS3ManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes);
- } else if (driver.equalsIgnoreCase(DRIVER_NAMES[2])) {
+ } else if (driver.equalsIgnoreCase(DRIVER_NAMES.get(2))) {
offloadPolicies.setGcsManagedLedgerOffloadRegion(region);
offloadPolicies.setGcsManagedLedgerOffloadBucket(bucket);
offloadPolicies.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes);
@@ -228,7 +288,7 @@ public class OffloadPolicies implements Serializable {
}
public boolean driverSupported() {
- return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver));
+ return DRIVER_NAMES.stream().anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver));
}
public static String getSupportedDriverNames() {
@@ -239,22 +299,22 @@ public class OffloadPolicies implements Serializable {
if (managedLedgerOffloadDriver == null) {
return false;
}
- return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[0])
- || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[1]);
+ return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(0))
+ || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(1));
}
public boolean isGcsDriver() {
if (managedLedgerOffloadDriver == null) {
return false;
}
- return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[2]);
+ return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(2));
}
public boolean isFileSystemDriver() {
if (managedLedgerOffloadDriver == null) {
return false;
}
- return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[3]);
+ return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(3));
}
public boolean bucketValid() {
@@ -277,6 +337,7 @@ public class OffloadPolicies implements Serializable {
@Override
public int hashCode() {
return Objects.hash(
+ managedLedgerOffloadedReadPriority,
managedLedgerOffloadDriver,
managedLedgerOffloadMaxThreads,
managedLedgerOffloadPrefetchRounds,
@@ -312,17 +373,18 @@ public class OffloadPolicies implements Serializable {
return false;
}
OffloadPolicies other = (OffloadPolicies) obj;
- return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
+ return Objects.equals(managedLedgerOffloadedReadPriority, other.getManagedLedgerOffloadedReadPriority())
+ && Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver())
&& Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads())
&& Objects.equals(managedLedgerOffloadPrefetchRounds, other.getManagedLedgerOffloadPrefetchRounds())
&& Objects.equals(managedLedgerOffloadThresholdInBytes,
- other.getManagedLedgerOffloadThresholdInBytes())
+ other.getManagedLedgerOffloadThresholdInBytes())
&& Objects.equals(managedLedgerOffloadDeletionLagInMillis,
- other.getManagedLedgerOffloadDeletionLagInMillis())
+ other.getManagedLedgerOffloadDeletionLagInMillis())
&& Objects.equals(s3ManagedLedgerOffloadRegion, other.getS3ManagedLedgerOffloadRegion())
&& Objects.equals(s3ManagedLedgerOffloadBucket, other.getS3ManagedLedgerOffloadBucket())
&& Objects.equals(s3ManagedLedgerOffloadServiceEndpoint,
- other.getS3ManagedLedgerOffloadServiceEndpoint())
+ other.getS3ManagedLedgerOffloadServiceEndpoint())
&& Objects.equals(s3ManagedLedgerOffloadMaxBlockSizeInBytes,
other.getS3ManagedLedgerOffloadMaxBlockSizeInBytes())
&& Objects.equals(s3ManagedLedgerOffloadReadBufferSizeInBytes,
@@ -352,6 +414,7 @@ public class OffloadPolicies implements Serializable {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
+ .add("managedLedgerOffloadedReadPriority", managedLedgerOffloadedReadPriority)
.add("managedLedgerOffloadDriver", managedLedgerOffloadDriver)
.add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads)
.add("managedLedgerOffloadPrefetchRounds", managedLedgerOffloadPrefetchRounds)
@@ -382,7 +445,7 @@ public class OffloadPolicies implements Serializable {
public Properties toProperties() {
Properties properties = new Properties();
-
+ setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority());
setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory());
setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver());
setProperty(properties, "managedLedgerOffloadMaxThreads",
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
index d87887d..a89409c 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.policies.data;
import java.util.Properties;
+import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -59,7 +60,8 @@ public class OffloadPoliciesTest {
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
- offloadDeletionLagInMillis
+ offloadDeletionLagInMillis,
+ OffloadedReadPriority.TIERED_STORAGE_FIRST
);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), driver);
@@ -86,6 +88,7 @@ public class OffloadPoliciesTest {
final Integer readBufferSizeInBytes = 2 * M;
final Long offloadThresholdInBytes = 0L;
final Long offloadDeletionLagInMillis = 5 * MIN;
+ final OffloadedReadPriority readPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST;
OffloadPolicies offloadPolicies = OffloadPolicies.create(
driver,
@@ -97,7 +100,8 @@ public class OffloadPoliciesTest {
maxBlockSizeInBytes,
readBufferSizeInBytes,
offloadThresholdInBytes,
- offloadDeletionLagInMillis
+ offloadDeletionLagInMillis,
+ readPriority
);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), driver);
@@ -107,6 +111,7 @@ public class OffloadPoliciesTest {
Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadReadBufferSizeInBytes(), readBufferSizeInBytes);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), offloadThresholdInBytes);
Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), offloadDeletionLagInMillis);
+ Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority(), readPriority);
}
@Test
diff --git a/site2/docs/cookbooks-tiered-storage.md b/site2/docs/cookbooks-tiered-storage.md
index 6dd2803..0263ec2 100644
--- a/site2/docs/cookbooks-tiered-storage.md
+++ b/site2/docs/cookbooks-tiered-storage.md
@@ -262,6 +262,16 @@ $ bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-name
> Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload will not until the current segment is full.
+## Configuring read priority for offloaded messages
+
+By default, once messages were offloaded to long term storage, brokers will read them from long term storage, but messages still exists in bookkeeper for a period depends on the administrator's configuration. For
+messages exists in both bookkeeper and long term storage, if they are preferred to read from bookkeeper, you can use command to change this configuration.
+
+```bash
+# default value for -orp is tiered-storage-first
+$ bin/pulsar-admin namespaces set-offload-policies my-tenant/my-namespace -orp bookkeeper-first
+$ bin/pulsar-admin topics set-offload-policies my-tenant/my-namespace/topic1 -orp bookkeeper-first
+```
## Triggering offload manually