You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/07 06:47:10 UTC

[pulsar] branch master updated: Support get topic applied policy for offloader (#9505)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a22eb82  Support get topic applied policy for offloader (#9505)
a22eb82 is described below

commit a22eb82d4a816b0f821f35e885b747e6f8ffb154
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sun Feb 7 14:46:44 2021 +0800

    Support get topic applied policy for offloader (#9505)
    
    Master Issue: #9216
    
    ### Modifications
    Add applied API for offloadPolicies
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 11 +++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 20 +++++----
 .../pulsar/broker/admin/AdminApiOffloadTest.java   | 47 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 15 +++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 15 ++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  5 ++-
 7 files changed, 106 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4d6d0c3..482e085 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -846,6 +846,17 @@ public class PersistentTopicsBase extends AdminResource {
         return CompletableFuture.completedFuture(delayedDeliveryPolicies);
     }
 
+    protected CompletableFuture<OffloadPolicies> internalGetOffloadPolicies(boolean applied) {
+        OffloadPolicies offloadPolicies =
+                getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
+        if (applied) {
+            OffloadPolicies namespacePolicy = getNamespacePolicies(namespaceName).offload_policies;
+            offloadPolicies = OffloadPolicies.mergeConfiguration(offloadPolicies
+                    , namespacePolicy, pulsar().getConfiguration().getProperties());
+        }
+        return CompletableFuture.completedFuture(offloadPolicies);
+    }
+
     protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies offloadPolicies) {
         TopicPolicies topicPolicies = null;
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e4f0311..0175365 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -276,14 +276,20 @@ public class PersistentTopics extends PersistentTopicsBase {
     public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
                                                     @PathParam("tenant") String tenant,
                                                     @PathParam("namespace") String namespace,
-                                                    @PathParam("topic") @Encoded String encodedTopic) {
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @QueryParam("applied") boolean applied) {
         validateTopicName(tenant, namespace, encodedTopic);
-        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
-        if (topicPolicies.isOffloadPoliciesSet()) {
-            asyncResponse.resume(topicPolicies.getOffloadPolicies());
-        } else {
-            asyncResponse.resume(Response.noContent().build());
-        }
+        internalGetOffloadPolicies(applied).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed get offloadPolicies", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed get offloadPolicies", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(res);
+            }
+        });
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index 92ade37..3dd1703 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -203,6 +203,53 @@ public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testOffloadPoliciesAppliedApi() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, 3);
+        pulsarClient.newProducer().topic(topicName).create().close();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));
+        OffloadPolicies offloadPolicies = admin.topics().getOffloadPolicies(topicName, true);
+        OffloadPolicies brokerPolicies = OffloadPolicies
+                .mergeConfiguration(null,null, pulsar.getConfiguration().getProperties());
+
+        assertEquals(offloadPolicies, brokerPolicies);
+        //Since off loader is not really set, avoid code exceptions
+        LedgerOffloader topicOffloaded = mock(LedgerOffloader.class);
+        when(topicOffloaded.getOffloadDriverName()).thenReturn("mock");
+        doReturn(topicOffloaded).when(pulsar).createManagedLedgerOffloader(any());
+
+        OffloadPolicies namespacePolicies = new OffloadPolicies();
+        namespacePolicies.setManagedLedgerOffloadThresholdInBytes(100L);
+        namespacePolicies.setManagedLedgerOffloadDeletionLagInMillis(200L);
+        namespacePolicies.setManagedLedgerOffloadDriver("s3");
+        namespacePolicies.setManagedLedgerOffloadBucket("buck");
+        admin.namespaces().setOffloadPolicies(myNamespace, namespacePolicies);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.namespaces().getOffloadPolicies(myNamespace), namespacePolicies));
+        assertEquals(
+                admin.topics().getOffloadPolicies(topicName, true), namespacePolicies);
+
+        OffloadPolicies topicPolicies = new OffloadPolicies();
+        topicPolicies.setManagedLedgerOffloadThresholdInBytes(200L);
+        topicPolicies.setManagedLedgerOffloadDeletionLagInMillis(400L);
+        topicPolicies.setManagedLedgerOffloadDriver("s3");
+        topicPolicies.setManagedLedgerOffloadBucket("buck2");
+
+        admin.topics().setOffloadPolicies(topicName, topicPolicies);
+        Awaitility.await().untilAsserted(()
+                -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), topicPolicies));
+
+        admin.topics().removeOffloadPolicies(topicName);
+        Awaitility.await().untilAsserted(()
+                -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), namespacePolicies));
+
+        admin.namespaces().removeOffloadPolicies(myNamespace);
+        Awaitility.await().untilAsserted(()
+                -> assertEquals(admin.topics().getOffloadPolicies(topicName, true), brokerPolicies));
+    }
+
+    @Test
     public void testTopicLevelOffloadPartitioned() throws Exception {
         testOffload(true);
     }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index d553559..f03c53c 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1922,6 +1922,21 @@ public interface Topics {
     CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic);
 
     /**
+     * get applied offload policies of a topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    OffloadPolicies getOffloadPolicies(String topic, boolean applied) throws PulsarAdminException;
+
+    /**
+     * get applied offload policies of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic, boolean applied);
+
+    /**
      * set offload policies of a topic.
      * @param topic
      * @param offloadPolicies
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 50f8923..5282ec5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1858,8 +1858,18 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public OffloadPolicies getOffloadPolicies(String topic) throws PulsarAdminException {
+        return getOffloadPolicies(topic, false);
+    }
+
+    @Override
+    public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic) {
+        return getOffloadPoliciesAsync(topic, false);
+    }
+
+    @Override
+    public OffloadPolicies getOffloadPolicies(String topic, boolean applied) throws PulsarAdminException {
         try {
-            return getOffloadPoliciesAsync(topic).
+            return getOffloadPoliciesAsync(topic, applied).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
@@ -1872,9 +1882,10 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
-    public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic) {
+    public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic, boolean applied) {
         TopicName topicName = validateTopic(topic);
         WebTarget path = topicPath(topicName, "offloadPolicies");
+        path = path.queryParam("applied", applied);
         final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
         asyncGetRequest(path, new InvocationCallback<OffloadPolicies>() {
             @Override
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 0421a3a..679a5c9 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -798,7 +798,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics, times(2)).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", false);
 
         cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
@@ -934,6 +934,8 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -ap"));
         verify(mockTopics).getMessageTTL("persistent://myprop/clust/ns1/ds1", true);
 
+        cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1 -ap"));
+        verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", true);
 
         cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -ap"));
         verify(mockTopics).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1", true);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index da38148..b07d487 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1362,10 +1362,13 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            print(getAdmin().topics().getOffloadPolicies(persistentTopic));
+            print(getAdmin().topics().getOffloadPolicies(persistentTopic, applied));
         }
     }