You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/02 09:29:17 UTC

[pulsar] branch branch-2.7 updated: Support topic-level max message size (#8732) (#8748)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new addaac7  Support topic-level max message size (#8732) (#8748)
addaac7 is described below

commit addaac7a3cc9409467a0883dfc1491723a9ae3bf
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Dec 2 17:24:20 2020 +0800

    Support topic-level max message size (#8732) (#8748)
    
    fix https://github.com/streamnative/pulsar/issues/1723
    
    ### Motivation
    The current policy to control the size of the message is at the broker level(maxMessageSize). It becomes easier to plan resource quotas for client allocation if the max message size pushed can be given at the topic level.
    
    ### Modifications
    
    Now the broker-level `maxMessageSize` is returned by the broker to the client, when the broker handles `handleConnected`. The client will cached `maxMessageSize` locally. An exception will be thrown if it exceeds the limit.
    
    Topic-level cannot be implemented like this, because:
    1) When `handleConnected`, the command received by the broker does not contain specific topic information, so it is not known which topic policy to return to the client.
    2) The client cannot cache topic-level policy. Unlike the broker-level policy, which will not change, the topic-level policy will change dynamically, which will involve cache consistency issues.
    
    I think the best way to handle this is to let the broker determine whether it exceeds the limit, and return an exception if it exceeds the limit, and handle the exception by the client's `handleSendError`.
    
    ### Verifying this change
    TopicPoliciesTest.java
    
    (cherry picked from commit 9c28378aea7da83164938c84fa6b55d0474fbbd8)
    
    Co-authored-by: feynmanlin <fe...@tencent.com>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 29 +++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 84 +++++++++++++++++++
 .../pulsar/broker/service/AbstractTopic.java       | 15 ++++
 .../org/apache/pulsar/broker/service/Producer.java | 15 +++-
 .../service/nonpersistent/NonPersistentTopic.java  |  5 ++
 .../broker/service/persistent/PersistentTopic.java | 12 +++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 97 ++++++++++++++++++++--
 .../org/apache/pulsar/client/admin/Topics.java     | 51 ++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 76 +++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  7 ++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 43 ++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  3 +
 .../apache/pulsar/client/impl/ProducerImpl.java    | 19 +++++
 .../pulsar/common/policies/data/TopicPolicies.java |  5 ++
 14 files changed, 452 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7fd98c4..fb2e7a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2613,6 +2613,35 @@ public class PersistentTopicsBase extends AdminResource {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
     }
 
+    protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSize) {
+        if (maxMessageSize != null && (maxMessageSize < 0 || maxMessageSize > config().getMaxMessageSize())) {
+            throw new RestException(Status.PRECONDITION_FAILED
+                    , "topic-level maxMessageSize must be greater than or equal to 0 " +
+                    "and must be smaller than that in the broker-level");
+        }
+
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
+        topicPolicies.setMaxMessageSize(maxMessageSize);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected Optional<Integer> internalGetMaxMessageSize() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
+    }
+
     protected Optional<Integer> internalGetMaxProducers() {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9021abd..d26b9eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1919,6 +1919,90 @@ public class PersistentTopics extends PersistentTopicsBase {
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
+    @ApiOperation(value = "Get maxMessageSize config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
+                                  @PathParam("tenant") String tenant,
+                                  @PathParam("namespace") String namespace,
+                                  @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Integer> policies = internalGetMaxMessageSize();
+            if (policies.isPresent()) {
+                asyncResponse.resume(policies.get());
+            } else {
+                asyncResponse.resume(Response.noContent().build());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
+    @ApiOperation(value = "Set maxMessageSize config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
+    public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
+                                  @PathParam("tenant") String tenant,
+                                  @PathParam("namespace") String namespace,
+                                  @PathParam("topic") @Encoded String encodedTopic,
+                                  @ApiParam(value = "The max message size of the topic") int maxMessageSize) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed updated persistence policies", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed updated persistence policies", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}",
+                        clientAppId(),
+                        namespaceName,
+                        topicName.getLocalName(),
+                        maxMessageSize);
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
+    @ApiOperation(value = "Remove maxMessageSize config for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove maxMessageSize", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
+                        clientAppId(),
+                        namespaceName,
+                        topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
 
     @POST
     @Path("/{tenant}/{namespace}/{topic}/terminate")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f9761e6..dde9ad6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -572,6 +572,21 @@ public abstract class AbstractTopic implements Topic {
         }
     }
 
+    protected boolean isExceedMaximumMessageSize(int size) {
+        Integer maxMessageSize = null;
+        TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
+        if (topicPolicies != null && topicPolicies.isMaxMessageSizeSet()) {
+            maxMessageSize = topicPolicies.getMaxMessageSize();
+        }
+        if (maxMessageSize != null) {
+            if (maxMessageSize == 0) {
+                return false;
+            }
+            return size > maxMessageSize;
+        }
+        return false;
+    }
+
     /**
      * update topic publish dispatcher for this topic.
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 8eebaa3..8e70a0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -358,8 +358,7 @@ public class Producer {
         @Override
         public void completed(Exception exception, long ledgerId, long entryId) {
             if (exception != null) {
-                ServerError serverError = (exception instanceof TopicTerminatedException)
-                        ? ServerError.TopicTerminatedError : ServerError.PersistenceError;
+                final ServerError serverError = getServerError(exception);
 
                 producer.cnx.execute(() -> {
                     if (!(exception instanceof TopicClosedException)) {
@@ -385,6 +384,18 @@ public class Producer {
             }
         }
 
+        private ServerError getServerError(Exception exception) {
+            ServerError serverError;
+            if (exception instanceof TopicTerminatedException) {
+                serverError = ServerError.TopicTerminatedError;
+            } else if (exception instanceof BrokerServiceException.NotAllowedException) {
+                serverError = ServerError.NotAllowedError;
+            } else {
+                serverError = ServerError.PersistenceError;
+            }
+            return serverError;
+        }
+
         /**
          * Executed from I/O thread when sending receipt back to client
          */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2138885..b2a065c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -164,6 +164,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
     @Override
     public void publishMessage(ByteBuf data, PublishContext callback) {
+        if (isExceedMaximumMessageSize(data.readableBytes())) {
+            callback.completed(new NotAllowedException("Exceed maximum message size")
+                    , -1, -1);
+            return;
+        }
         callback.completed(null, 0L, 0L);
         ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cd2ba17..33b49ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -340,6 +340,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             decrementPendingWriteOpsAndCheck();
             return;
         }
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+            publishContext.completed(new NotAllowedException("Exceed maximum message size")
+                    , -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
 
         MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
         switch (status) {
@@ -2455,6 +2461,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             decrementPendingWriteOpsAndCheck();
             return;
         }
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+            publishContext.completed(new NotAllowedException("Exceed maximum message size")
+                    , -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
 
         MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
         switch (status) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index e2165d2..8fe2be2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -18,32 +18,31 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
-import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
-import org.apache.pulsar.common.policies.data.SubscribeRate;
-import static org.testng.Assert.assertEquals;
-
 import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
+import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -55,6 +54,12 @@ import java.lang.reflect.Field;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 @Slf4j
 public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
@@ -91,6 +96,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
     @Override
     public void cleanup() throws Exception {
         super.internalCleanup();
+        this.resetConfig();
     }
 
     @Test
@@ -1160,4 +1166,81 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
         Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);
     }
+
+    @Test(timeOut = 20000)
+    public void testTopicMaxMessageSizeApi() throws Exception{
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
+        assertNull(admin.topics().getMaxMessageSize(persistenceTopic));
+
+        admin.topics().setMaxMessageSize(persistenceTopic,10);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null);
+        assertEquals(admin.topics().getMaxMessageSize(persistenceTopic).intValue(),10);
+
+        admin.topics().removeMaxMessageSize(persistenceTopic);
+        assertNull(admin.topics().getMaxMessageSize(persistenceTopic));
+
+        try {
+            admin.topics().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(),412);
+        }
+        try {
+            admin.topics().setMaxMessageSize(persistenceTopic, -1);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(),412);
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testTopicMaxMessageSize() throws Exception{
+        doTestTopicMaxMessageSize(true);
+        doTestTopicMaxMessageSize(false);
+    }
+
+    private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
+        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+        if (isPartitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+        // init cache
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        assertNull(admin.topics().getMaxMessageSize(topic));
+        // set msg size
+        admin.topics().setMaxMessageSize(topic, 10);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
+        assertEquals(admin.topics().getMaxMessageSize(topic).intValue(), 10);
+
+        try {
+            producer.send(new byte[1024]);
+        } catch (PulsarClientException e) {
+            assertTrue(e instanceof PulsarClientException.NotAllowedException);
+        }
+
+        admin.topics().removeMaxMessageSize(topic);
+        assertNull(admin.topics().getMaxMessageSize(topic));
+
+        try {
+            admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+        }
+        try {
+            admin.topics().setMaxMessageSize(topic, -1);
+            fail("should fail");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+        }
+
+        MessageId messageId = producer.send(new byte[1024]);
+        assertNotNull(messageId);
+        producer.close();
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0ae525f..4c8a8ffa 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2455,6 +2455,57 @@ public interface Topics {
      * @param topic Topic name
      */
     CompletableFuture<Void> removeMaxProducersAsync(String topic);
+    /**
+     * Get the max message size for specified topic.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    Integer getMaxMessageSize(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the max message size for specified topic asynchronously.
+     *
+     * @param topic Topic name
+     * @return Configuration of bookkeeper persistence policies
+     * @throws PulsarAdminException Unexpected error
+     */
+    CompletableFuture<Integer> getMaxMessageSizeAsync(String topic);
+
+
+    /**
+     * Set the max message size for specified topic.
+     *
+     * @param topic Topic name
+     * @param maxMessageSize Max message size of producer
+     * @throws PulsarAdminException Unexpected error
+     */
+    void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException;
+
+    /**
+     * Set the max message size for specified topic asynchronously.0 disables.
+     *
+     * @param topic Topic name
+     * @param maxMessageSize Max message size of topic
+     * @throws PulsarAdminException Unexpected error
+     */
+    CompletableFuture<Void> setMaxMessageSizeAsync(String topic, int maxMessageSize);
+
+    /**
+     * Remove the max message size for specified topic.
+     *
+     * @param topic Topic name
+     * @throws PulsarAdminException Unexpected error
+     */
+    void removeMaxMessageSize(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove the max message size for specified topic asynchronously.
+     *
+     * @param topic Topic name
+     */
+    CompletableFuture<Void> removeMaxMessageSizeAsync(String topic);
 
     /**
      * Get the max number of consumer for specified topic.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 8b1f83f..eca4c5f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2637,6 +2637,82 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
+    public Integer getMaxMessageSize(String topic) throws PulsarAdminException {
+        try {
+            return getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Integer> getMaxMessageSizeAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxMessageSize");
+        final CompletableFuture<Integer> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Integer>() {
+                    @Override
+                    public void completed(Integer maxMessageSize) {
+                        future.complete(maxMessageSize);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException {
+        try {
+            setMaxMessageSizeAsync(topic, maxMessageSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setMaxMessageSizeAsync(String topic, int maxMessageSize) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxMessageSize");
+        return asyncPostRequest(path, Entity.entity(maxMessageSize, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeMaxMessageSize(String topic) throws PulsarAdminException {
+        try {
+            removeMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxMessageSizeAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "maxMessageSize");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxConsumers(String topic) throws PulsarAdminException {
         try {
             return getMaxConsumersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 5453a86..927ee73 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -778,6 +778,13 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
         verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
 
+        cmdTopics.run(split("get-max-message-size persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getMaxMessageSize("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-max-message-size persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).removeMaxMessageSize("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-message-size persistent://myprop/clust/ns1/ds1 -m 99"));
+        verify(mockTopics).setMaxMessageSize("persistent://myprop/clust/ns1/ds1", 99);
+
         cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index c5ed51d..4519568 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -165,6 +165,10 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("set-maxProducers", new SetMaxProducers());
         jcommander.addCommand("remove-maxProducers", new RemoveMaxProducers());
 
+        jcommander.addCommand("get-max-message-size", new GetMaxMessageSize());
+        jcommander.addCommand("set-max-message-size", new SetMaxMessageSize());
+        jcommander.addCommand("remove-max-message-size", new RemoveMaxMessageSize());
+
         jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
         jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
@@ -1692,6 +1696,45 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get max message size for a topic")
+    private class GetMaxMessageSize extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMaxMessageSize(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set max message size for a topic")
+    private class SetMaxMessageSize extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--max-message-size", "-m"}, description = "Max message size for a topic", required = true)
+        private int maxMessageSize;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMaxMessageSize(persistentTopic, maxMessageSize);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max message size for a topic")
+    private class RemoveMaxMessageSize extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMaxMessageSize(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get max consumers per subscription for a topic")
     private class GetMaxConsumersPerSubscription extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 44a93b4..bbcba68 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -605,6 +605,9 @@ public class ClientCnx extends PulsarHandler {
         case TopicTerminatedError:
             producers.get(producerId).terminated(this);
             break;
+        case NotAllowedError:
+            producers.get(producerId).recoverNotAllowedError(sequenceId);
+            break;
 
         default:
             // By default, for transient error, let the reconnection logic
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1efc0c9..3ab4207 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1039,6 +1039,25 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         resendMessages(cnx);
     }
 
+    protected synchronized void recoverNotAllowedError(long sequenceId) {
+        OpSendMsg op = pendingMessages.peek();
+        if(op != null && sequenceId == getHighestSequenceId(op)){
+            pendingMessages.remove();
+            releaseSemaphoreForSendOp(op);
+            try {
+                op.callback.sendComplete(
+                        new PulsarClientException.NotAllowedException(
+                                format("The size of the message which is produced by producer %s to the topic " +
+                                        "%s is not allowed", producerName, topic)));
+            } catch (Throwable t) {
+                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
+                        producerName, sequenceId, t);
+            }
+            ReferenceCountUtil.safeRelease(op.cmd);
+            op.recycle();
+        }
+    }
+
     /**
      * Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that
      * message is corrupt.
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index eb5098d..69d9fa3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -57,6 +57,11 @@ public class TopicPolicies {
     private PublishRate publishRate = null;
     private SubscribeRate subscribeRate = null;
     private Integer deduplicationSnapshotIntervalSeconds = null;
+    private Integer maxMessageSize = null;
+
+    public boolean isMaxMessageSizeSet() {
+        return maxMessageSize != null;
+    }
 
     public boolean isDeduplicationSnapshotIntervalSecondsSet(){
         return deduplicationSnapshotIntervalSeconds != null;