You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/12 23:37:55 UTC

[pulsar] branch master updated: Support set MaxUnackMessagesPerSubscription on topic level (#7802)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c694e5  Support set MaxUnackMessagesPerSubscription on topic level (#7802)
9c694e5 is described below

commit 9c694e5a9bfb25b427c22883ae1dd7d4477863e6
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Aug 13 07:37:28 2020 +0800

    Support set MaxUnackMessagesPerSubscription on topic level (#7802)
    
    Motivation
    support set MaxUnackMessagesPerSubscription on topic level
    
    Modifications
    Support set/get/remove MaxUnackMessagesPerSubscription policy on topic level.
    
    Verifying this change
    Added Unit test to verify set/get/remove MaxUnackMessagesPerSubscription policy at Topic level work as expected when Topic level policy is enabled/disabled
    
    org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnSubscriptionApi
    org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnSubscription
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  15 ++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  65 +++++++
 .../PersistentDispatcherMultipleConsumers.java     |   4 +-
 .../broker/service/persistent/PersistentTopic.java |  11 +-
 .../broker/admin/AdminApiMaxUnackedMessages.java   |   2 -
 .../broker/admin/MaxUnackedMessagesTest.java       | 205 +++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  45 +++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  78 ++++++++
 .../pulsar/common/policies/data/TopicPolicies.java |   5 +
 9 files changed, 425 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e66d29c..eda5104 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -800,6 +800,21 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            throw new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init");
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
     private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
         Topic topic;
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 0d48c5a..3139b9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -251,6 +251,71 @@ public class PersistentTopics extends PersistentTopicsBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
+    @ApiOperation(value = "Get max unacked messages per subscription config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
+            asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
+        } else {
+            asyncResponse.resume(Response.noContent().build());
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
+    @ApiOperation(value = "Set max unacked messages per subscription config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void setMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
+                                                    @PathParam("tenant") String tenant,
+                                                    @PathParam("namespace") String namespace,
+                                                    @PathParam("topic") @Encoded String encodedTopic,
+                                                    @ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
+                                                            Integer maxUnackedNum) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        validateAdminAccessForTenant(tenant);
+        validatePoliciesReadOnlyAccess();
+        checkTopicLevelPolicyEnable();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
+    @ApiOperation(value = "Delete max unacked messages per subscription config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
+    public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
+                                                       @PathParam("tenant") String tenant,
+                                                       @PathParam("namespace") String namespace,
+                                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        setMaxUnackedMessagesOnSubscription(asyncResponse, tenant, namespace, encodedTopic, null);
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
     @ApiOperation(value = "Get delayed delivery messages config on a topic.")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 67c2df9..a6d6582 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -350,7 +350,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
             } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                 log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
-                        totalUnackedMessages, topic.maxUnackedMessagesOnSubscription);
+                        totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
             } else if (!havePendingRead) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
@@ -694,7 +694,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     @Override
     public void addUnAckedMessages(int numberOfMessages) {
-        int maxUnackedMessages = topic.maxUnackedMessagesOnSubscription;
+        int maxUnackedMessages = topic.getMaxUnackedMessagesOnSubscription();
         if (maxUnackedMessages == -1) {
             maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
                     .getMaxUnackedMessagesPerSubscription();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5af4b30..a310716 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -186,7 +186,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private volatile double lastUpdatedAvgPublishRateInMsg = 0;
     private volatile double lastUpdatedAvgPublishRateInByte = 0;
 
-    public volatile int maxUnackedMessagesOnSubscription = -1;
+    private volatile int maxUnackedMessagesOnSubscription = -1;
     private volatile boolean isClosingOrDeleting = false;
 
     private static class TopicStatsHelper {
@@ -2325,4 +2325,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
         return delayedDeliveryEnabled;
     }
+
+    public int getMaxUnackedMessagesOnSubscription() {
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+        //Topic level setting has higher priority than namespace level
+        if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
+            return topicPolicies.getMaxUnackedMessagesOnSubscription();
+        }
+        return maxUnackedMessagesOnSubscription;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
index d72b050..077e608 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMaxUnackedMessages.java
@@ -44,8 +44,6 @@ import static org.testng.Assert.*;
 @Slf4j
 public class AdminApiMaxUnackedMessages extends MockedPulsarServiceBaseTest {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AdminApiMaxUnackedMessages.class);
-
     @BeforeMethod
     @Override
     public void setup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
new file mode 100644
index 0000000..c30e03a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.testng.Assert.fail;
+
+public class MaxUnackedMessagesTest extends ProducerConsumerBase {
+    private final String testTenant = "public";
+    private final String testNamespace = "default";
+    private final String myNamespace = testTenant + "/" + testNamespace;
+    private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setSystemTopicEnabled(true);
+        this.conf.setTopicLevelPoliciesEnabled(true);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 10000)
+    public void testMaxUnackedMessagesOnSubscriptionApi() throws Exception {
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, 3);
+        waitCacheInit(topicName);
+        Integer max = admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
+        assertNull(max);
+
+        admin.topics().setMaxUnackedMessagesOnSubscription(topicName, 2048);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertEquals(admin.topics().getMaxUnackedMessagesOnSubscription(topicName).intValue(), 2048);
+        admin.topics().removeMaxUnackedMessagesOnSubscription(topicName);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        assertNull(admin.topics().getMaxUnackedMessagesOnSubscription(topicName));
+    }
+
+    // See https://github.com/apache/pulsar/issues/5438
+    @Test(timeOut = 20000)
+    public void testMaxUnackedMessagesOnSubscription() throws Exception {
+        final String topicName = testTopic + System.currentTimeMillis();
+        final String subscriberName = "test-sub" + System.currentTimeMillis();
+        final int unackMsgAllowed = 100;
+        final int receiverQueueSize = 10;
+        final int totalProducedMsgs = 200;
+
+        pulsar.getConfiguration().setMaxUnackedMessagesPerSubscription(unackMsgAllowed);
+        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
+                .subscriptionType(SubscriptionType.Shared);
+        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+        Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+        List<Consumer<?>> consumers = Lists.newArrayList(consumer1, consumer2, consumer3);
+        waitCacheInit(topicName);
+        admin.topics().setMaxUnackedMessagesOnSubscription(topicName, unackMsgAllowed);
+        for (int i = 0; i < 50; i++) {
+            if (admin.topics().getMaxUnackedMessagesOnSubscription(topicName) != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+        // (1) Produced Messages
+        for (int i = 0; i < totalProducedMsgs; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // (2) try to consume messages: but will be able to consume number of messages = unackMsgAllowed
+        Message<?> msg = null;
+        Map<Message<?>, Consumer<?>> messages = Maps.newHashMap();
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < totalProducedMsgs; j++) {
+                msg = consumers.get(i).receive(500, TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    messages.put(msg, consumers.get(i));
+                } else {
+                    break;
+                }
+            }
+        }
+
+        // client must receive number of messages = unAckedMessagesBufferSize rather all produced messages: check
+        // delta as 3 consumers with receiverQueueSize = 10
+        Assert.assertEquals(messages.size(), unackMsgAllowed, receiverQueueSize * 3);
+
+        // start acknowledging messages
+        messages.forEach((m, c) -> {
+            try {
+                c.acknowledge(m);
+            } catch (PulsarClientException e) {
+                fail("ack failed", e);
+            }
+        });
+
+        // try to consume remaining messages: broker may take time to deliver so, retry multiple time to consume
+        // all messages
+        Set<MessageId> result = ConcurrentHashMap.newKeySet();
+        // expecting messages which are not received
+        int expectedRemainingMessages = totalProducedMsgs - messages.size();
+        CountDownLatch latch = new CountDownLatch(expectedRemainingMessages);
+        for (int i = 0; i < consumers.size(); i++) {
+            final int consumerCount = i;
+            for (int j = 0; j < totalProducedMsgs; j++) {
+                consumers.get(i).receiveAsync().thenAccept(m -> {
+                    result.add(m.getMessageId());
+                    try {
+                        consumers.get(consumerCount).acknowledge(m);
+                    } catch (PulsarClientException e) {
+                        fail("failed to ack msg", e);
+                    }
+                    latch.countDown();
+                });
+            }
+        }
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        // total received-messages should match to produced messages (it may have duplicate messages)
+        Assert.assertEquals(result.size(), expectedRemainingMessages);
+
+        producer.close();
+        consumers.forEach(c -> {
+            try {
+                c.close();
+            } catch (PulsarClientException e) {
+            }
+        });
+    }
+
+    private void waitCacheInit(String topicName) throws Exception {
+        for (int i = 0; i < 50; i++) {
+            try {
+                admin.topics().getMaxUnackedMessagesOnSubscription(topicName);
+                break;
+            } catch (Exception e) {
+                //ignore
+                Thread.sleep(100);
+            }
+            if (i == 49) {
+                throw new RuntimeException("Waiting for cache initialization has timed out");
+            }
+        }
+    }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index a40dcc1..025a62f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1663,4 +1663,49 @@ public interface Topics {
      *            Topic name
      */
     CompletableFuture<Void> removeRetentionAsync(String topic);
+
+    /**
+     * get max unacked messages on subscription of a topic.
+     * @param topic
+     * @return
+     * @throws PulsarAdminException
+     */
+    Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException;
+
+    /**
+     * get max unacked messages on subscription of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic);
+
+    /**
+     * set max unacked messages on subscription of a topic.
+     * @param topic
+     * @param maxNum
+     * @throws PulsarAdminException
+     */
+    void setMaxUnackedMessagesOnSubscription(String topic, int maxNum) throws PulsarAdminException;
+
+    /**
+     * set max unacked messages on subscription of a topic asynchronously.
+     * @param topic
+     * @param maxNum
+     * @return
+     */
+    CompletableFuture<Void> setMaxUnackedMessagesOnSubscriptionAsync(String topic, int maxNum);
+
+    /**
+     * remove max unacked messages on subscription of a topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void removeMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException;
+
+    /**
+     * remove max unacked messages on subscription of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> removeMaxUnackedMessagesOnSubscriptionAsync(String topic);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 6b04e60..391b092 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1511,6 +1511,84 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
+        try {
+            return getMaxUnackedMessagesOnSubscriptionAsync(topic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<Integer>() {
+            @Override
+            public void completed(Integer maxNum) {
+                future.complete(maxNum);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public void setMaxUnackedMessagesOnSubscription(String topic, int maxNum) throws PulsarAdminException {
+        try {
+            setMaxUnackedMessagesOnSubscriptionAsync(topic, maxNum).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxUnackedMessagesOnSubscriptionAsync(String topic, int maxNum) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
+        return asyncPostRequest(path, Entity.entity(maxNum, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
+        try {
+            removeMaxUnackedMessagesOnSubscriptionAsync(topic).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxUnackedMessagesOnSubscriptionAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
         try {
             TopicName topicName = validateTopic(topic);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 796ff4a..8be7312 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -54,6 +54,11 @@ public class TopicPolicies {
     public boolean isDelayedDeliveryEnabledSet(){
         return delayedDeliveryEnabled != null;
     }
+    private Integer maxUnackedMessagesOnSubscription = null;
+
+    public boolean isMaxUnackedMessagesOnSubscriptionSet() {
+        return maxUnackedMessagesOnSubscription != null;
+    }
 
     public boolean isBacklogQuotaSet() {
         return !backLogQuotaMap.isEmpty();