You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 02:35:43 UTC

[pulsar] branch master updated: Use ExecutorService.execute() instead of submit() when possible (#16183)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bcd40e6cf65 Use ExecutorService.execute() instead of submit() when possible (#16183)
bcd40e6cf65 is described below

commit bcd40e6cf65ec5035d40b991181a364f8bbfae06
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 22 19:35:36 2022 -0700

    Use ExecutorService.execute() instead of submit() when possible (#16183)
    
    * Use ExecutorService.execute() instead of submit() when possible
    
    * Fixed compilation
    
    * Fixed compilation
---
 .../pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java |  2 +-
 .../java/org/apache/pulsar/broker/service/BrokerService.java   | 10 +++++-----
 .../broker/service/persistent/PersistentSubscription.java      |  2 +-
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java |  2 +-
 .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java |  2 +-
 .../pulsar/functions/source/batch/BatchSourceExecutor.java     |  2 +-
 .../io/batchdatagenerator/BatchDataGeneratorPushSource.java    |  2 +-
 .../io/elasticsearch/client/elastic/ElasticBulkProcessor.java  |  2 +-
 .../org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java     |  2 +-
 .../src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java |  2 +-
 .../src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java  |  2 +-
 .../main/java/org/apache/pulsar/io/redis/sink/RedisSink.java   |  2 +-
 .../metadata/bookkeeper/AbstractHierarchicalLedgerManager.java |  2 +-
 .../apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java |  2 +-
 .../org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java |  9 ++++-----
 .../java/org/apache/pulsar/proxy/server/ProxyConnection.java   |  2 +-
 .../offload/filesystem/impl/FileStoreBackedReadHandleImpl.java |  5 +++--
 .../filesystem/impl/FileSystemManagedLedgerOffloader.java      |  6 +++---
 .../offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java     |  4 ++--
 .../offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java   |  4 ++--
 .../offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java   | 10 +++++-----
 21 files changed, 38 insertions(+), 38 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 9b618346e61..b6e7a7dde19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -287,7 +287,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
                     });
 
             try {
-                scheduler.submit(ModularLoadManagerImpl.this::updateAll);
+                scheduler.execute(ModularLoadManagerImpl.this::updateAll);
             } catch (RejectedExecutionException e) {
                 // Executor is shutting down
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5dadf1266e0..9951fb25e52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2096,7 +2096,7 @@ public class BrokerService implements Closeable {
                 bundles.getBundles().forEach(bundle -> {
                     pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist -> {
                         if (isExist) {
-                            this.pulsar().getExecutor().submit(() -> {
+                            this.pulsar().getExecutor().execute(() -> {
                                 try {
                                     pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(),
                                             bundle.getBundleRange());
@@ -2319,7 +2319,7 @@ public class BrokerService implements Closeable {
     }
 
     private void updateMaxPublishRatePerTopicInMessages() {
-        this.pulsar().getExecutor().submit(() ->
+        this.pulsar().getExecutor().execute(() ->
             forEachTopic(topic -> {
                 if (topic instanceof AbstractTopic) {
                     ((AbstractTopic) topic).updateBrokerPublishRate();
@@ -2329,7 +2329,7 @@ public class BrokerService implements Closeable {
     }
 
     private void updateSubscribeRate() {
-        this.pulsar().getExecutor().submit(() ->
+        this.pulsar().getExecutor().execute(() ->
             forEachTopic(topic -> {
                 if (topic instanceof PersistentTopic) {
                     ((PersistentTopic) topic).updateBrokerSubscribeRate();
@@ -2392,7 +2392,7 @@ public class BrokerService implements Closeable {
     }
 
     private void updateSubscriptionMessageDispatchRate() {
-        this.pulsar().getExecutor().submit(() -> {
+        this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic subscription
             forEachTopic(topic -> {
                 if (topic instanceof AbstractTopic) {
@@ -2409,7 +2409,7 @@ public class BrokerService implements Closeable {
     }
 
     private void updateReplicatorMessageDispatchRate() {
-        this.pulsar().getExecutor().submit(() -> {
+        this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic Replicator in Geo-replication
             forEachTopic(topic -> {
                     if (topic instanceof AbstractTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dbb5346fe66..2dd503616fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -329,7 +329,7 @@ public class PersistentSubscription implements Subscription {
 
                 // when topic closes: it iterates through concurrent-subscription map to close each subscription. so,
                 // topic.remove again try to access same map which creates deadlock. so, execute it in different thread.
-                topic.getBrokerService().pulsar().getExecutor().submit(() ->{
+                topic.getBrokerService().pulsar().getExecutor().execute(() ->{
                     topic.removeSubscription(subName);
                     // Also need remove the cursor here, otherwise the data deletion will not work well.
                     // Because data deletion depends on the mark delete position of all cursors.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index e0eee02e957..d52955ccf0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -642,7 +642,7 @@ public class ClientCnx extends PulsarHandler {
             if (firstOneWaiting != null) {
                 maxLookupRequestSemaphore.release();
                 // schedule a new lookup in.
-                eventLoopGroup.submit(() -> {
+                eventLoopGroup.execute(() -> {
                     long newId = firstOneWaiting.getLeft();
                     TimedCompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
                     addPendingLookupRequests(newId, newFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 32bdb44ac8c..6cd02f2698f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1124,7 +1124,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                                               Throwable error,
                                               CompletableFuture<Void> subscribeFuture) {
         log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage());
-        client.externalExecutorProvider().getExecutor().submit(() -> {
+        client.externalExecutorProvider().getExecutor().execute(() -> {
             AtomicInteger toCloseNum = new AtomicInteger(0);
             consumers.values().stream().filter(consumer1 -> {
                 String consumerTopicName = consumer1.getTopic();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 2fc5b720564..3e09bb64237 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -167,7 +167,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
       discoverInProgress = true;
     }
     // Run this code asynchronous so it doesn't block processing of the tasks
-    discoveryThread.submit(() -> {
+    discoveryThread.execute(() -> {
       try {
         batchSource.discover(task -> taskEater(discoveredEvent, task));
       } catch (Exception e) {
diff --git a/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
index 7a8b5fd6c2a..de4654e5f5f 100644
--- a/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
+++ b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
@@ -62,7 +62,7 @@ public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implem
   public void prepare(byte[] instanceSplit) throws Exception {
     log.info("Instance " + sourceContext.getInstanceId() + " got a new discovered task {}",
             new String(instanceSplit, StandardCharsets.UTF_8));
-    executor.submit(this);
+    executor.execute(this);
   }
 
   @Override
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
index 3754c0c7acb..fd4dc757bd3 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
@@ -297,7 +297,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
                     promise.completeExceptionally(ex);
                 }
             };
-            internalExecutorService.submit(responseCallable);
+            internalExecutorService.execute(responseCallable);
 
             CompletableFuture<Void> listenerCalledPromise = new CompletableFuture();
 
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
index c84e87b16a4..d8e6c92254e 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
@@ -120,7 +120,7 @@ public abstract class HbaseAbstractSink<T> implements Sink<T> {
         }
 
         if (number == batchSize) {
-            flushExecutor.submit(() -> flush());
+            flushExecutor.execute(() -> flush());
         }
     }
 
diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
index 0c14b648c18..359c0909ca6 100644
--- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
+++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
@@ -62,7 +62,7 @@ public abstract class BatchSink<T, R> implements Sink<R> {
         }
 
         if (currentSize >= batchSize) {
-            flushExecutor.submit(this::flush);
+            flushExecutor.execute(this::flush);
         }
     }
 
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 3923ec4c8d2..18c630952d6 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -120,7 +120,7 @@ public class MongoSink implements Sink<byte[]> {
         }
 
         if (currentSize == mongoConfig.getBatchSize()) {
-            flushExecutor.submit(() -> flush());
+            flushExecutor.execute(() -> flush());
         }
     }
 
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index 0c04891c421..29535e6cbe7 100644
--- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -90,7 +90,7 @@ public class RedisSink implements Sink<byte[]> {
             currentSize = incomingList.size();
         }
         if (currentSize == batchSize) {
-            flushExecutor.submit(this::flush);
+            flushExecutor.execute(this::flush);
         }
     }
 
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
index 5e7fef285c5..cbfa68692ae 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
@@ -146,7 +146,7 @@ abstract class AbstractHierarchicalLedgerManager {
                     }
                     final T dataToProcess = data.get(next);
                     final AsyncCallback.VoidCallback stub = this;
-                    scheduler.submit(() -> processor.process(dataToProcess, stub));
+                    scheduler.execute(() -> processor.process(dataToProcess, stub));
                 }
             });
         }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
index fcb5ee285c8..91a938cc2e1 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
@@ -377,7 +377,7 @@ public class PulsarLedgerManager implements LedgerManager {
                     if (log.isDebugEnabled()) {
                         log.debug("Ledger metadata is changed for {} : {}.", ledgerId, result);
                     }
-                    scheduler.submit(() -> {
+                    scheduler.execute(() -> {
                         synchronized (listenerSet) {
                             for (BookkeeperInternalCallbacks.LedgerMetadataListener listener : listenerSet) {
                                 listener.onChanged(ledgerId, result);
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index 404ede1ae79..ab4f59fa507 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -105,12 +105,12 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
     private final OpStatsLogger syncStats;
     private final OpStatsLogger createClientStats;
 
-    private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() {
+    private final Runnable clientCreator = new Runnable() {
 
         @Override
-        public ZooKeeper call() throws Exception {
+        public void run() {
             try {
-                return ZooWorker.syncCallWithRetries(null, new ZooWorker.ZooCallable<ZooKeeper>() {
+                ZooWorker.syncCallWithRetries(null, new ZooWorker.ZooCallable<ZooKeeper>() {
 
                     @Override
                     public ZooKeeper call() throws KeeperException, InterruptedException {
@@ -140,7 +140,6 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
             } catch (Exception e) {
                 log.error("Gave up reconnecting to ZooKeeper : ", e);
                 Runtime.getRuntime().exit(-1);
-                return null;
             }
         }
 
@@ -356,7 +355,7 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
         log.info("ZooKeeper session {} is expired from {}.",
                 Long.toHexString(getSessionId()), connectString);
         try {
-            connectExecutor.submit(clientCreator);
+            connectExecutor.execute(clientCreator);
         } catch (RejectedExecutionException ree) {
             if (!closed.get()) {
                 log.error("ZooKeeper reconnect task is rejected : ", ree);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index c97416280c3..d3c83198773 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -405,7 +405,7 @@ public class ProxyConnection extends PulsarHandler {
     public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
         try {
             final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
-            ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+            ctx.executor().execute(() -> handleBrokerConnected(directProxyHandler, finalConnected));
         } catch (RejectedExecutionException e) {
             LOG.error("Event loop was already closed. Closing broker connection.", e);
             directProxyHandler.close();
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index ecc0cfc23b3..5d10b63de34 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -90,9 +90,10 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
     @Override
     public CompletableFuture<Void> closeAsync() {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        executor.submit(() -> {
+        executor.execute(() -> {
                 try {
                     reader.close();
+                    promise.complete(null);
                 } catch (IOException t) {
                     promise.completeExceptionally(t);
                 }
@@ -106,7 +107,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
             log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        executor.submit(() -> {
+        executor.execute(() -> {
             if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index d91d84b476b..8e87c230adb 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -149,7 +149,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
     @Override
     public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.chooseThread(readHandle.getId()).submit(
+        scheduler.chooseThread(readHandle.getId()).execute(
                 new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration,
                         assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(),
                         this.offloaderStats));
@@ -228,7 +228,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                     semaphore.acquire();
                     countDownLatch = new CountDownLatch(1);
                     assignmentScheduler.chooseThread(ledgerId)
-                            .submit(FileSystemWriter.create(ledgerEntriesOnce,
+                            .execute(FileSystemWriter.create(ledgerEntriesOnce,
                                     dataWriter, semaphore, countDownLatch, haveOffloadEntryNumber, this));
                     needToOffloadFirstEntryNumber = end + 1;
                 } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed()
@@ -339,7 +339,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String storagePath = getStoragePath(storageBasePath, ledgerName);
         String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
-        scheduler.chooseThread(ledgerId).submit(() -> {
+        scheduler.chooseThread(ledgerId).execute(() -> {
             try {
                 MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
                         configuration);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 998912a30c4..d99702e37c9 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -86,7 +86,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     @Override
     public CompletableFuture<Void> closeAsync() {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        executor.submit(() -> {
+        executor.execute(() -> {
                 try {
                     index.close();
                     inputStream.close();
@@ -103,7 +103,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        executor.submit(() -> {
+        executor.execute(() -> {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             boolean seeked = false;
             try {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index e1a8296a20e..b896f38d390 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -115,7 +115,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
     @Override
     public CompletableFuture<Void> closeAsync() {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        executor.submit(() -> {
+        executor.execute(() -> {
             try {
                 for (OffloadIndexBlockV2 indexBlock : indices) {
                     indexBlock.close();
@@ -141,7 +141,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
             promise.completeExceptionally(new IllegalArgumentException());
             return promise;
         }
-        executor.submit(() -> {
+        executor.execute(() -> {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             List<GroupedReader> groupedReaders = null;
             try {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 1fdc0983b0e..606934b5f0e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -181,7 +181,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata,
                 config.getBlobStoreLocation(), writeBlobStore);
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.chooseThread(readHandle.getId()).submit(() -> {
+        scheduler.chooseThread(readHandle.getId()).execute(() -> {
             if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
                 promise.completeExceptionally(
                         new IllegalArgumentException("An empty or open ledger should never be offloaded"));
@@ -539,7 +539,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
         String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
-        scheduler.chooseThread(ledgerId).submit(() -> {
+        scheduler.chooseThread(ledgerId).execute(() -> {
             try {
                 promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
                         readBlobstore,
@@ -573,7 +573,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
             indexKeys.add(indexKey);
         });
 
-        scheduler.chooseThread(ledgerId).submit(() -> {
+        scheduler.chooseThread(ledgerId).execute(() -> {
             try {
                 promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
                         readBlobstore,
@@ -597,7 +597,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.chooseThread(ledgerId).submit(() -> {
+        scheduler.chooseThread(ledgerId).execute(() -> {
             try {
                 readBlobstore.removeBlobs(readBucket,
                     ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid),
@@ -623,7 +623,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        scheduler.submit(() -> {
+        scheduler.execute(() -> {
             try {
                 readBlobstore.removeBlobs(readBucket,
                         ImmutableList.of(uid.toString(),