You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/07 06:47:10 UTC
[pulsar] branch master updated: Support get topic applied policy
for offloader (#9505)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a22eb82 Support get topic applied policy for offloader (#9505)
a22eb82 is described below
commit a22eb82d4a816b0f821f35e885b747e6f8ffb154
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sun Feb 7 14:46:44 2021 +0800
Support get topic applied policy for offloader (#9505)
Master Issue: #9216
### Modifications
Add applied API for offloadPolicies
---
.../broker/admin/impl/PersistentTopicsBase.java | 11 +++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 20 +++++----
.../pulsar/broker/admin/AdminApiOffloadTest.java | 47 ++++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 15 +++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 15 ++++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 5 ++-
7 files changed, 106 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4d6d0c3..482e085 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -846,6 +846,17 @@ public class PersistentTopicsBase extends AdminResource {
return CompletableFuture.completedFuture(delayedDeliveryPolicies);
}
+ protected CompletableFuture<OffloadPolicies> internalGetOffloadPolicies(boolean applied) {
+ OffloadPolicies offloadPolicies =
+ getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
+ if (applied) {
+ OffloadPolicies namespacePolicy = getNamespacePolicies(namespaceName).offload_policies;
+ offloadPolicies = OffloadPolicies.mergeConfiguration(offloadPolicies
+ , namespacePolicy, pulsar().getConfiguration().getProperties());
+ }
+ return CompletableFuture.completedFuture(offloadPolicies);
+ }
+
protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
TopicPolicies topicPolicies = null;
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e4f0311..0175365 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -276,14 +276,20 @@ public class PersistentTopics extends PersistentTopicsBase {
public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic) {
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
- TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
- if (topicPolicies.isOffloadPoliciesSet()) {
- asyncResponse.resume(topicPolicies.getOffloadPolicies());
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
+ internalGetOffloadPolicies(applied).whenComplete((res, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed get offloadPolicies", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed get offloadPolicies", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ asyncResponse.resume(res);
+ }
+ });
}
@POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 92ade37..3dd1703 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -203,6 +203,53 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testOffloadPoliciesAppliedApi() throws Exception {
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ admin.topics().createPartitionedTopic(topicName, 3);
+ pulsarClient.newProducer().topic(topicName).create().close();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
+ OffloadPolicies offloadPolicies = admin.topics().getOffloadPolicies(topicName, true);
+ OffloadPolicies brokerPolicies = OffloadPolicies
+ .mergeConfiguration(null,null, pulsar.getConfiguration().getProperties());
+
+ assertEquals(offloadPolicies, brokerPolicies);
+ //Since off loader is not really set, avoid code exceptions
+ LedgerOffloader topicOffloaded = mock(LedgerOffloader.class);
+ when(topicOffloaded.getOffloadDriverName()).thenReturn("mock");
+ doReturn(topicOffloaded).when(pulsar).createManagedLedgerOffloader(any());
+
+ OffloadPolicies namespacePolicies = new OffloadPolicies();
+ namespacePolicies.setManagedLedgerOffloadThresholdInBytes(100L);
+ namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
+ namespacePolicies.setManagedLedgerOffloadDriver("s3");
+ namespacePolicies.setManagedLedgerOffloadBucket("buck");
+ admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), namespacePolicies));
+ assertEquals(
+ admin.topics().getOffloadPolicies(topicName, true), namespacePolicies);
+
+ OffloadPolicies topicPolicies = new OffloadPolicies();
+ topicPolicies.setManagedLedgerOffloadThresholdInBytes(200L);
+ topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L);
+ topicPolicies.setManagedLedgerOffloadDriver("s3");
+ topicPolicies.setManagedLedgerOffloadBucket("buck2");
+
+ admin.topics().setOffloadPolicies(topicName, topicPolicies);
+ Awaitility.await().untilAsserted(()
+ -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), topicPolicies));
+
+ admin.topics().removeOffloadPolicies(topicName);
+ Awaitility.await().untilAsserted(()
+ -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), namespacePolicies));
+
+ admin.namespaces().removeOffloadPolicies(myNamespace);
+ Awaitility.await().untilAsserted(()
+ -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), brokerPolicies));
+ }
+
+ @Test
public void testTopicLevelOffloadPartitioned() throws Exception {
testOffload(true);
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index d553559..f03c53c 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1922,6 +1922,21 @@ public interface Topics {
CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic);
/**
+ * get applied offload policies of a topic.
+ * @param topic
+ * @return
+ * @throws PulsarAdminException
+ */
+ OffloadPolicies getOffloadPolicies(String topic, boolean applied) throws PulsarAdminException;
+
+ /**
+ * get applied offload policies of a topic asynchronously.
+ * @param topic
+ * @return
+ */
+ CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic, boolean applied);
+
+ /**
* set offload policies of a topic.
* @param topic
* @param offloadPolicies
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 50f8923..5282ec5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1858,8 +1858,18 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public OffloadPolicies getOffloadPolicies(String topic) throws PulsarAdminException {
+ return getOffloadPolicies(topic, false);
+ }
+
+ @Override
+ public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic) {
+ return getOffloadPoliciesAsync(topic, false);
+ }
+
+ @Override
+ public OffloadPolicies getOffloadPolicies(String topic, boolean applied) throws PulsarAdminException {
try {
- return getOffloadPoliciesAsync(topic).
+ return getOffloadPoliciesAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
@@ -1872,9 +1882,10 @@ public class TopicsImpl extends BaseResource implements Topics {
}
@Override
- public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic) {
+ public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "offloadPolicies");
+ path = path.queryParam("applied", applied);
final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<OffloadPolicies>() {
@Override
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 0421a3a..679a5c9 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -798,7 +798,7 @@ public class PulsarAdminToolTest {
verify(mockTopics, times(2)).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1"));
- verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1");
+ verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
@@ -934,6 +934,8 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getMessageTTL("persistent://myprop/clust/ns1/ds1", true);
+ cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1 -ap"));
+ verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", true);
cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", true);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index da38148..b07d487 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1362,10 +1362,13 @@ public class CmdTopics extends CmdBase {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+ private boolean applied = false;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- print(getAdmin().topics().getOffloadPolicies(persistentTopic));
+ print(getAdmin().topics().getOffloadPolicies(persistentTopic, applied));
}
}