You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/19 01:04:37 UTC

[pulsar] branch branch-2.8 updated (5c3ddc2 -> bd5f75f)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 5c3ddc2  [feature][pulsar-broker]: Add additional servlet support to broker (#11498)
     new d30c5d1  [pulsar-admin] allow create functions with package URL (#11666)
     new bd5f75f  KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...istentStickyKeyDispatcherMultipleConsumers.java | 25 +++++-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 34 ++++++++
 .../management/core/common/PackageType.java        |  0
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 93 +++++++++++++++++++++-
 .../org/apache/pulsar/common/functions/Utils.java  | 10 ++-
 .../functions/worker/rest/api/FunctionsImpl.java   |  8 +-
 .../functions/worker/rest/api/SinksImpl.java       |  8 +-
 .../functions/worker/rest/api/SourcesImpl.java     |  8 +-
 .../tests/integration/cli/PackagesCliTest.java     | 12 +--
 9 files changed, 166 insertions(+), 32 deletions(-)
 rename {pulsar-package-management/core => pulsar-client-admin-api}/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java (100%)

[pulsar] 01/02: [pulsar-admin] allow create functions with package URL (#11666)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d30c5d16525d7cb34a303a9f36915f262714e7f7
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Aug 17 12:59:08 2021 +0800

    [pulsar-admin] allow create functions with package URL (#11666)
    
    Fix https://github.com/apache/pulsar/issues/11665
    
    ### Motivation
    
    Allow user to create function with package URL with pulsar-admin.
    
    ### Modifications
    
    - allow passing valid package URL from pulsar-admin functions
    - added tests
    
    (cherry picked from commit de86f4f615adbe0ce8ac1f5a0d9077153bc38bdb)
---
 .../management/core/common/PackageType.java        |  0
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 93 +++++++++++++++++++++-
 .../org/apache/pulsar/common/functions/Utils.java  | 10 ++-
 .../functions/worker/rest/api/FunctionsImpl.java   |  8 +-
 .../functions/worker/rest/api/SinksImpl.java       |  8 +-
 .../functions/worker/rest/api/SourcesImpl.java     |  8 +-
 .../tests/integration/cli/PackagesCliTest.java     | 12 +--
 7 files changed, 111 insertions(+), 28 deletions(-)

diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
similarity index 100%
rename from pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
rename to pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index d7f68a4..3e32961 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -82,14 +82,18 @@ public class CmdFunctionsTest {
     private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();
     private static final String GO_EXEC_FILE_NAME = "test-go-function-with-url";
     private static final String PYTHON_FILE_NAME = "test-go-function-with-url";
-    private static final String URL ="file:" + JAR_NAME;
-    private static final String URL_WITH_GO ="file:" + GO_EXEC_FILE_NAME;
-    private static final String URL_WITH_PY ="file:" + PYTHON_FILE_NAME;
+    private static final String URL = "file:" + JAR_NAME;
+    private static final String URL_WITH_GO = "file:" + GO_EXEC_FILE_NAME;
+    private static final String URL_WITH_PY = "file:" + PYTHON_FILE_NAME;
     private static final String FN_NAME = TEST_NAME + "-function";
     private static final String INPUT_TOPIC_NAME = TEST_NAME + "-input-topic";
     private static final String OUTPUT_TOPIC_NAME = TEST_NAME + "-output-topic";
     private static final String TENANT = TEST_NAME + "-tenant";
     private static final String NAMESPACE = TEST_NAME + "-namespace";
+    private static final String PACKAGE_URL = "function://sample/ns1/jardummyexamples@1";
+    private static final String PACKAGE_GO_URL = "function://sample/ns1/godummyexamples@1";
+    private static final String PACKAGE_PY_URL = "function://sample/ns1/pydummyexamples@1";
+    private static final String PACKAGE_INVALID_URL = "functionsample.jar";
 
     private PulsarAdmin admin;
     private Functions functions;
@@ -362,6 +366,89 @@ public class CmdFunctionsTest {
     }
 
     @Test
+    public void testCreateFunctionWithPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", FN_NAME,
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--jar", PACKAGE_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals(FN_NAME, creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
+    public void testCreateGoFunctionWithPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", "test-go-function",
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--go", PACKAGE_GO_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals("test-go-function", creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
+    public void testCreatePyFunctionWithPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", "test-py-function",
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--py", PACKAGE_PY_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", "process_python_function",
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals("test-py-function", creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
+    public void testCreateFunctionWithInvalidPackageUrl() throws Exception {
+        cmd.run(new String[] {
+                "create",
+                "--name", FN_NAME,
+                "--inputs", INPUT_TOPIC_NAME,
+                "--output", OUTPUT_TOPIC_NAME,
+                "--jar", PACKAGE_INVALID_URL,
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        });
+
+        CreateFunction creater = cmd.getCreater();
+
+        assertEquals(FN_NAME, creater.getFunctionName());
+        assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
+        assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
+        verify(functions, times(0)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
+    }
+
+    @Test
     public void testCreateFunctionWithoutBasicArguments() throws Exception {
         cmd.run(new String[] {
                 "create",
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
index 9629aa8..abc601a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.common.functions;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import java.util.Arrays;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.packages.management.core.common.PackageType;
 
 /**
  * Helper class to work with configuration.
@@ -34,7 +36,13 @@ public class Utils {
 
     public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
         return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
-                || functionPkgUrl.startsWith(FILE));
+                || functionPkgUrl.startsWith(FILE)
+                || hasPackageTypePrefix(functionPkgUrl));
+    }
+
+    public static boolean hasPackageTypePrefix(String destPkgUrl) {
+        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())
+                && destPkgUrl.contains("://"));
     }
 
     public static void inferMissingFunctionName(FunctionConfig functionConfig) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index c656954..dba3645 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -154,7 +154,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (hasPackageTypePrefix(functionPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
                         componentPackageFile = downloadPackageFile(functionPkgUrl);
                     } else {
                         if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
@@ -323,7 +323,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             // validate parameters
             try {
                 if (isNotBlank(functionPkgUrl)) {
-                    if (hasPackageTypePrefix(functionPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
                         componentPackageFile = downloadPackageFile(functionName);
                     } else {
                         try {
@@ -759,10 +759,6 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
 
     }
 
-    private static boolean hasPackageTypePrefix(String destPkgUrl) {
-        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
-    }
-
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
         return downloadPackageFile(worker(), packageName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index d0d2ed3..dd9e990 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -152,7 +152,7 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (hasPackageTypePrefix(sinkPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
                         componentPackageFile = downloadPackageFile(sinkPkgUrl);
                     } else {
                         if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
@@ -322,7 +322,7 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             // validate parameters
             try {
                 if (isNotBlank(sinkPkgUrl)) {
-                    if (hasPackageTypePrefix(sinkPkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
                         componentPackageFile = downloadPackageFile(sinkPkgUrl);
                     } else {
                         try {
@@ -742,10 +742,6 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
         return SinkConfigUtils.convert(sinkConfig, sinkDetails);
     }
 
-    private static boolean hasPackageTypePrefix(String destPkgUrl) {
-        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
-    }
-
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
         return FunctionsImpl.downloadPackageFile(worker(), packageName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 47c379e..2e3295d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -152,7 +152,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (hasPackageTypePrefix(sourcePkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
                         componentPackageFile = downloadPackageFile(sourcePkgUrl);
                     } else {
                         if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
@@ -320,7 +320,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             // validate parameters
             try {
                 if (isNotBlank(sourcePkgUrl)) {
-                    if (hasPackageTypePrefix(sourcePkgUrl)) {
+                    if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
                         componentPackageFile = downloadPackageFile(sourcePkgUrl);
                     } else {
                         try {
@@ -739,10 +739,6 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
         return SourceConfigUtils.convert(sourceConfig, sourceDetails);
     }
 
-    private static boolean hasPackageTypePrefix(String destPkgUrl) {
-        return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
-    }
-
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
         return FunctionsImpl.downloadPackageFile(worker(), packageName);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
index 3d8c6f1..df4b48c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java
@@ -45,9 +45,9 @@ public class PackagesCliTest extends TestRetrySupport {
     public final void setup() throws Exception {
         incrementSetupNumber();
         PulsarClusterSpec spec = PulsarClusterSpec.builder()
-            .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
-            .brokerEnvs(getPackagesManagementServiceEnvs())
-            .build();
+                .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
+                .brokerEnvs(getPackagesManagementServiceEnvs())
+                .build();
         pulsarCluster = PulsarCluster.forSpec(spec);
         pulsarCluster.start();
     }
@@ -88,13 +88,13 @@ public class PackagesCliTest extends TestRetrySupport {
     public void testPackagesOperationsWithUploadingPackages() throws Exception {
         String testPackageName = "function://public/default/test@v1";
         ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package",
-            "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
+                "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
         assertEquals(result.getExitCode(), 0);
 
         BrokerContainer container = pulsarCluster.getBroker(0);
         String downloadFile = "tmp-file-" + RandomStringUtils.randomAlphabetic(8);
         String[] downloadCmd = new String[]{PulsarCluster.ADMIN_SCRIPT, "packages", "download",
-            "--path", downloadFile, testPackageName};
+                "--path", downloadFile, testPackageName};
         result = container.execCmd(downloadCmd);
         assertEquals(result.getExitCode(), 0);
 
@@ -119,7 +119,7 @@ public class PackagesCliTest extends TestRetrySupport {
 
         String contact = "test@apache.org";
         result = runPackagesCommand("update-metadata", "--description", "a test package",
-            "--contact", contact, "-PpropertyA=A", testPackageName);
+                "--contact", contact, "-PpropertyA=A", testPackageName);
         assertEquals(result.getExitCode(), 0);
 
         result = runPackagesCommand("get-metadata", testPackageName);

[pulsar] 02/02: KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bd5f75f6f351bb0ac0dfeb50fec1dbac501fe748
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 17 18:22:11 2021 -0700

    KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692)
    
    ### Motivation
    
    Fixes #10734
    
    The KeyShared dispatcher for non-persistent topics is not taking the flow control advertised by consumers to apply back pressure from consumer. That results in broker in pushing messages to consumer without restriction, causing memory issue in consumers.
    
    (cherry picked from commit 5835fd23e5347dd73ff073c371366a6b4ba8d6c4)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 25 +++++++++++++---
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 34 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index dc0d8a6..704fd93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.protocol.Commands;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
 
@@ -89,7 +90,11 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
 
         for (Entry entry : entries) {
             Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer()));
-            groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+            if (consumer != null) {
+                groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+            } else {
+                entry.release();
+            }
         }
 
         for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) {
@@ -99,9 +104,21 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
             filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false);
-            consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
-                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
-            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
+
+            if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
+                consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
+                        sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
+                        getRedeliveryTracker());
+                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
+            } else {
+                entriesForConsumer.forEach(e -> {
+                    int totalMsgs = Commands.getNumberOfMessagesInBatch(e.getDataBuffer(), subscription.toString(), -1);
+                    if (totalMsgs > 0) {
+                        msgDrop.recordEvent(totalMsgs);
+                    }
+                    e.release();
+                });
+            }
         }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index 49dac2a..990bd8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -58,6 +58,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
@@ -118,6 +119,8 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
     @Test(timeOut = 10000)
     public void testSendMessage() throws BrokerServiceException {
         Consumer consumerMock = mock(Consumer.class);
+        when(consumerMock.getAvailablePermits()).thenReturn(1000);
+        when(consumerMock.isWritable()).thenReturn(true);
         nonpersistentDispatcher.addConsumer(consumerMock);
 
         List<Entry> entries = new ArrayList<>();
@@ -146,6 +149,37 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
                 eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
     }
 
+    @Test(timeOut = 10000)
+    public void testSendMessageRespectFlowControl() throws BrokerServiceException {
+        Consumer consumerMock = mock(Consumer.class);
+        nonpersistentDispatcher.addConsumer(consumerMock);
+
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+        doAnswer(invocationOnMock -> {
+            ChannelPromise mockPromise = mock(ChannelPromise.class);
+            List<Entry> receivedEntries = invocationOnMock.getArgument(0, List.class);
+            for (int index = 1; index <= receivedEntries.size(); index++) {
+                Entry entry = receivedEntries.get(index - 1);
+                assertEquals(entry.getLedgerId(), 1);
+                assertEquals(entry.getEntryId(), index);
+                ByteBuf byteBuf = entry.getDataBuffer();
+                MessageMetadata messageMetadata = Commands.parseMessageMetadata(byteBuf);
+                assertEquals(byteBuf.toString(UTF_8), "message" + index);
+            };
+            return mockPromise;
+        }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+        try {
+            nonpersistentDispatcher.sendMessages(entries);
+        } catch (Exception e) {
+            fail("Failed to sendMessages.", e);
+        }
+        verify(consumerMock, times(0)).sendMessages(any(List.class), any(EntryBatchSizes.class),
+                eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+    }
+
     private ByteBuf createMessage(String message, int sequenceId) {
         return createMessage(message, sequenceId, "testKey");
     }