You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/11 12:54:24 UTC

[pulsar] branch branch-2.11 updated (5009bebf230 -> fe34738bcbd)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 5009bebf230 [improve][admin] Not allow to terminate system topic. (#17006)
     new 3529547c00b [fix][test] Stop worker services when tearing down PulsarFunctionTlsTest. (#17054)
     new 690a19cb37e [improve][functions][admin] Improve the package download process (#16365)
     new f9d569b6e94 [fix][broker] Increment topic stats outbound message counters after messages have been written to the TCP/IP connection (#17043)
     new fe34738bcbd [fix][flaky-test]TxnLogBufferedWriterTest.testMainProcess (#16999)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/service/Consumer.java | 21 +++--
 .../broker/admin/v3/PackagesApiNotEnabledTest.java | 17 +++-
 .../pulsar/broker/admin/v3/PackagesApiTest.java    | 18 +++-
 .../functions/worker/PulsarFunctionTlsTest.java    |  8 +-
 .../pulsar/client/admin/internal/PackagesImpl.java | 98 ++++++++++++++++------
 .../coordinator/impl/TxnLogBufferedWriterTest.java | 29 +++----
 6 files changed, 139 insertions(+), 52 deletions(-)


[pulsar] 04/04: [fix][flaky-test]TxnLogBufferedWriterTest.testMainProcess (#16999)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe34738bcbd1c0e220b3911aa17979824387719f
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Aug 11 16:53:32 2022 +0800

    [fix][flaky-test]TxnLogBufferedWriterTest.testMainProcess (#16999)
---
 .../coordinator/impl/TxnLogBufferedWriterTest.java | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 1220c92fd55..23fc04fda93 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -179,9 +180,9 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
                     dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize,
                     batchedWriteMaxDelayInMillis, batchEnabled);
         // Store the param-context, param-position, param-exception of callback function and complete-count for verify.
-        ArrayList<Integer> contextArrayOfCallback = new ArrayList<>();
-        ArrayList<ManagedLedgerException> exceptionArrayOfCallback = new ArrayList<>();
-        LinkedHashMap<PositionImpl, ArrayList<Position>> positionsOfCallback = new LinkedHashMap<>();
+        List<Integer> contextArrayOfCallback = Collections.synchronizedList(new ArrayList<>());
+        List<ManagedLedgerException> exceptionArrayOfCallback = Collections.synchronizedList(new ArrayList<>());
+        Map<PositionImpl, List<Position>> positionsOfCallback = Collections.synchronizedMap(new LinkedHashMap<>());
         AtomicBoolean anyFlushCompleted = new AtomicBoolean();
         TxnLogBufferedWriter.AddDataCallback callback = new TxnLogBufferedWriter.AddDataCallback(){
             @Override
@@ -192,7 +193,8 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
                 }
                 contextArrayOfCallback.add((int)ctx);
                 PositionImpl lightPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId());
-                positionsOfCallback.computeIfAbsent(lightPosition, p -> new ArrayList<>());
+                positionsOfCallback.computeIfAbsent(lightPosition,
+                        p -> Collections.synchronizedList(new ArrayList<>()));
                 positionsOfCallback.get(lightPosition).add(position);
             }
             @Override
@@ -234,7 +236,8 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
         Awaitility.await().atMost(maxWaitSeconds, TimeUnit.SECONDS)
                 .until(() -> contextArrayOfCallback.size() == writeCmdExecuteCount);
         // Assert callback param-context, verify that all callbacks are executed in strict order.
-        if (closeBufferedWriter){
+        // If exception occurs, the failure callback be executed earlier. So sorted contextArrayOfCallback.
+        if (closeBufferedWriter || bookieErrorType == BookieErrorType.SOMETIMES_ERROR){
             Collections.sort(contextArrayOfCallback);
         }
         Assert.assertEquals(contextArrayOfCallback.size(), writeCmdExecuteCount);
@@ -246,8 +249,6 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
         int exceptionCallbackCount = exceptionArrayOfCallback.size();
         int positionCallbackCount = (int) positionsOfCallback.values().stream().flatMap(l -> l.stream()).count();
         if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType ||  closeBufferedWriter){
-            Assert.assertTrue(exceptionCallbackCount > 0);
-            Assert.assertTrue(positionCallbackCount > 0);
             Assert.assertEquals(exceptionCallbackCount + positionCallbackCount, writeCmdExecuteCount);
         } else if (BookieErrorType.NO_ERROR == bookieErrorType){
             Assert.assertEquals(positionCallbackCount, writeCmdExecuteCount);
@@ -256,13 +257,13 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
         }
         // if enabled batch-feature, will verify the attributes (batchSize, batchIndex) of callback param-position.
         if (exactlyBatched && BookieErrorType.ALWAYS_ERROR != bookieErrorType){
-            Iterator<ArrayList<Position>> callbackPositionIterator = positionsOfCallback.values().iterator();
+            Iterator<List<Position>> callbackPositionIterator = positionsOfCallback.values().iterator();
             List<String> exactlyFlushedDataArray = dataSerializer.getGeneratedJsonArray();
             for (int batchedEntryIndex = 0; batchedEntryIndex < exactlyFlushedDataArray.size() - exceptionCallbackCount;
                  batchedEntryIndex++) {
                 String json = exactlyFlushedDataArray.get(batchedEntryIndex);
                 List<Integer> batchedData = JsonDataSerializer.deserializeMergedData(json);
-                ArrayList<Position> innerPositions = callbackPositionIterator.next();
+                List<Position> innerPositions = callbackPositionIterator.next();
                 for (int i = 0; i < batchedData.size(); i++) {
                     TxnBatchedPositionImpl innerPosition =
                             (TxnBatchedPositionImpl) innerPositions.get(i);
@@ -368,7 +369,7 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
                 1, TimeUnit.MILLISECONDS);
         SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
         // Cache the data flush to Bookie for Asserts.
-        List<Integer> dataArrayFlushedToBookie = new ArrayList<>();
+        List<Integer> dataArrayFlushedToBookie = Collections.synchronizedList(new ArrayList<>());
         Mockito.doAnswer(new Answer() {
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable {
@@ -531,10 +532,10 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
 
         private static ObjectMapper objectMapper = new ObjectMapper();
 
-        private ArrayList<ByteBuf> generatedByteBufArray = new ArrayList<>();
+        private List<ByteBuf> generatedByteBufArray = Collections.synchronizedList(new ArrayList<>());
 
         @Getter
-        private ArrayList<String> generatedJsonArray = new ArrayList<>();
+        private List<String> generatedJsonArray = Collections.synchronizedList(new ArrayList<>());
 
         private int eachDataBytesLen = 4;
 
@@ -590,8 +591,8 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase {
 
         protected void cleanup(){
             // Just for GC.
-            generatedByteBufArray = new ArrayList<>();
-            generatedJsonArray = new ArrayList<>();
+            generatedByteBufArray = Collections.synchronizedList(new ArrayList<>());
+            generatedJsonArray = Collections.synchronizedList(new ArrayList<>());
         }
 
         protected void assertAllByteBufHasBeenReleased(){


[pulsar] 02/04: [improve][functions][admin] Improve the package download process (#16365)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 690a19cb37e389c351c755635146b5b61dbdb22f
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 11:05:44 2022 +0800

    [improve][functions][admin] Improve the package download process (#16365)
    
    * Improve the package download process
    ---
    
    *Motivation*
    
    Improve the package download process to handle the download
    body more efficient.
---
 .../broker/admin/v3/PackagesApiNotEnabledTest.java | 17 +++-
 .../pulsar/broker/admin/v3/PackagesApiTest.java    | 18 +++-
 .../pulsar/client/admin/internal/PackagesImpl.java | 98 ++++++++++++++++------
 3 files changed, 105 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
index 3fd39d30d2d..2c39fbbaf8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.admin.v3;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
@@ -45,14 +49,23 @@ public class PackagesApiNotEnabledTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 60000)
-    public void testPackagesOperationsWithoutPackagesServiceEnabled() {
+    public void testPackagesOperationsWithoutPackagesServiceEnabled() throws Exception {
         // download package api should return 503 Service Unavailable exception
         String unknownPackageName = "function://public/default/unknown@v1";
+        Path tmp = Files.createTempDirectory("package-test-tmp");
         try {
-            admin.packages().download(unknownPackageName, "/test/unknown");
+            admin.packages().download(unknownPackageName, tmp.toAbsolutePath().toString() + "/unknown");
             fail("should throw 503 error");
         } catch (PulsarAdminException e) {
             assertEquals(503, e.getStatusCode());
+        } finally {
+            Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> {
+                try {
+                    Files.delete(p);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
         }
 
         // get metadata api should return 503 Service Unavailable exception
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
index 69331c02c7d..dd082681b23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
@@ -32,7 +32,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 @Test(groups = "broker-admin")
@@ -101,14 +105,24 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 60000)
-    public void testPackagesOperationsFailed() {
+    public void testPackagesOperationsFailed() throws IOException {
         // download a non-existent package should return not found exception
         String unknownPackageName = "function://public/default/unknown@v1";
+
+        Path tmp = Files.createTempDirectory("package-test-tmp");
         try {
-            admin.packages().download(unknownPackageName, "/test/unknown");
+            admin.packages().download(unknownPackageName, tmp.toAbsolutePath() + "/unknown");
             fail("should throw 404 error");
         } catch (PulsarAdminException e) {
             assertEquals(404, e.getStatusCode());
+        } finally {
+            Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> {
+                try {
+                    Files.delete(p);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
         }
 
         // get the metadata of a non-existent package should return not found exception
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index 1ed3e5da367..885e39c1ce6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -18,18 +18,19 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static org.asynchttpclient.Dsl.get;
 import com.google.gson.Gson;
+import io.netty.handler.codec.http.HttpHeaders;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -39,8 +40,11 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.packages.management.core.common.PackageName;
+import org.asynchttpclient.AsyncHandler;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.Dsl;
+import org.asynchttpclient.HttpResponseBodyPart;
+import org.asynchttpclient.HttpResponseStatus;
 import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.request.body.multipart.FilePart;
 import org.asynchttpclient.request.body.multipart.StringPart;
@@ -125,30 +129,76 @@ public class PackagesImpl extends ComponentResource implements Packages {
     public CompletableFuture<Void> downloadAsync(String packageName, String path) {
         WebTarget webTarget = packages.path(PackageName.get(packageName).toRestPath());
         final CompletableFuture<Void> future = new CompletableFuture<>();
-        asyncGetRequest(webTarget, new InvocationCallback<Response>(){
-            @Override
-            public void completed(Response response) {
-                if (response.getStatus() == Response.Status.OK.getStatusCode()) {
-                    try (InputStream inputStream = response.readEntity(InputStream.class)) {
-                        Path destinyPath = Paths.get(path);
-                        if (destinyPath.getParent() != null) {
-                            Files.createDirectories(destinyPath.getParent());
+        try {
+            Path destinyPath = Paths.get(path);
+            if (destinyPath.getParent() != null) {
+                Files.createDirectories(destinyPath.getParent());
+            }
+
+            FileChannel os = new FileOutputStream(destinyPath.toFile()).getChannel();
+            RequestBuilder builder = get(webTarget.getUri().toASCIIString());
+
+            CompletableFuture<HttpResponseStatus> statusFuture =
+                httpClient.executeRequest(addAuthHeaders(webTarget, builder).build(),
+                    new AsyncHandler<HttpResponseStatus>() {
+                        private HttpResponseStatus status;
+
+                        @Override
+                        public State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
+                            status = httpResponseStatus;
+                            if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
+                                return State.ABORT;
+                            }
+                            return State.CONTINUE;
                         }
-                        Files.copy(inputStream, destinyPath, StandardCopyOption.REPLACE_EXISTING);
-                        future.complete(null);
+
+                        @Override
+                        public State onHeadersReceived(HttpHeaders httpHeaders) throws Exception {
+                            return State.CONTINUE;
+                        }
+
+                        @Override
+                        public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
+                            os.write(httpResponseBodyPart.getBodyByteBuffer());
+                            return State.CONTINUE;
+                        }
+
+                        @Override
+                        public void onThrowable(Throwable throwable) {
+                            // we don't need to handle that throwable and use the returned future to handle it.
+                        }
+
+                        @Override
+                        public HttpResponseStatus onCompleted() throws Exception {
+                            return status;
+                        }
+                    }).toCompletableFuture();
+            statusFuture
+                .whenComplete((status, throwable) -> {
+                    try {
+                        os.close();
                     } catch (IOException e) {
-                        future.completeExceptionally(e);
+                        future.completeExceptionally(getApiException(throwable));
                     }
-                } else {
-                    future.completeExceptionally(getApiException(response));
-                }
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(throwable);
-            }
-        });
+                })
+                .thenAccept(status -> {
+                    if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+                        future.completeExceptionally(
+                            getApiException(Response
+                                .status(status.getStatusCode())
+                                .entity(status.getStatusText())
+                                .build()));
+                    } else {
+                        future.complete(null);
+                    }
+                })
+                .exceptionally(throwable -> {
+                    future.completeExceptionally(getApiException(throwable));
+                    return null;
+                });
+        } catch (Exception e) {
+            future.completeExceptionally(getApiException(e));
+        }
         return future;
     }
 


[pulsar] 03/04: [fix][broker] Increment topic stats outbound message counters after messages have been written to the TCP/IP connection (#17043)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f9d569b6e94c7aa1a8d836766ad618407979e30e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Aug 11 07:12:21 2022 +0300

    [fix][broker] Increment topic stats outbound message counters after messages have been written to the TCP/IP connection (#17043)
---
 .../org/apache/pulsar/broker/service/Consumer.java  | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 37dfe087e7f..5c7646921fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -332,14 +332,19 @@ public class Consumer {
                    topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
         }
         incrementUnackedMessages(unackedMessages);
-        msgOut.recordMultipleEvents(totalMessages, totalBytes);
-        msgOutCounter.add(totalMessages);
-        bytesOutCounter.add(totalBytes);
-        chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
-
-
-        return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
-                entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
+        Future<Void> writeAndFlushPromise =
+                cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
+                        entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
+        writeAndFlushPromise.addListener(status -> {
+            // only increment counters after the messages have been successfully written to the TCP/IP connection
+            if (status.isSuccess()) {
+                msgOut.recordMultipleEvents(totalMessages, totalBytes);
+                msgOutCounter.add(totalMessages);
+                bytesOutCounter.add(totalBytes);
+                chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);
+            }
+        });
+        return writeAndFlushPromise;
     }
 
     private void incrementUnackedMessages(int ackedMessages) {


[pulsar] 01/04: [fix][test] Stop worker services when tearing down PulsarFunctionTlsTest. (#17054)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3529547c00bc6a4d96f25d87db5ea7bddfc0f7f1
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Aug 11 20:48:09 2022 +0800

    [fix][test] Stop worker services when tearing down PulsarFunctionTlsTest. (#17054)
---
 .../org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index a58a6494d2b..e7c6dddc9b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -71,6 +71,7 @@ public class PulsarFunctionTlsTest {
     protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
     protected PulsarService leaderPulsar;
     protected PulsarAdmin leaderAdmin;
+    protected WorkerService[] fnWorkerServices = new WorkerService[BROKER_COUNT];
     protected String testCluster = "my-cluster";
     protected String testTenant = "my-tenant";
     protected String testNamespace = testTenant + "/my-ns";
@@ -137,12 +138,12 @@ public class PulsarFunctionTlsTest {
             workerConfig.setBrokerClientAuthenticationEnabled(true);
             workerConfig.setTlsEnabled(true);
             workerConfig.setUseTls(true);
-            WorkerService fnWorkerService = WorkerServiceLoader.load(workerConfig);
+            fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);
 
             configurations[i] = config;
 
             pulsarServices[i] = new PulsarService(
-                config, workerConfig, Optional.of(fnWorkerService), code -> {});
+                config, workerConfig, Optional.of(fnWorkerServices[i]), code -> {});
             pulsarServices[i].start();
 
             // Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily.
@@ -181,6 +182,9 @@ public class PulsarFunctionTlsTest {
                 if (pulsarAdmins[i] != null) {
                     pulsarAdmins[i].close();
                 }
+                if (fnWorkerServices[i] != null) {
+                    fnWorkerServices[i].stop();
+                }
                 if (pulsarServices[i] != null) {
                     pulsarServices[i].close();
                 }