You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 02:35:43 UTC
[pulsar] branch master updated: Use ExecutorService.execute() instead of submit() when possible (#16183)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bcd40e6cf65 Use ExecutorService.execute() instead of submit() when possible (#16183)
bcd40e6cf65 is described below
commit bcd40e6cf65ec5035d40b991181a364f8bbfae06
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 22 19:35:36 2022 -0700
Use ExecutorService.execute() instead of submit() when possible (#16183)
* Use ExecutorService.execute() instead of submit() when possible
* Fixed compilation
* Fixed compilation
---
.../pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java | 2 +-
.../java/org/apache/pulsar/broker/service/BrokerService.java | 10 +++++-----
.../broker/service/persistent/PersistentSubscription.java | 2 +-
.../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 2 +-
.../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 +-
.../pulsar/functions/source/batch/BatchSourceExecutor.java | 2 +-
.../io/batchdatagenerator/BatchDataGeneratorPushSource.java | 2 +-
.../io/elasticsearch/client/elastic/ElasticBulkProcessor.java | 2 +-
.../org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java | 2 +-
.../src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java | 2 +-
.../src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java | 2 +-
.../main/java/org/apache/pulsar/io/redis/sink/RedisSink.java | 2 +-
.../metadata/bookkeeper/AbstractHierarchicalLedgerManager.java | 2 +-
.../apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java | 2 +-
.../org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 9 ++++-----
.../java/org/apache/pulsar/proxy/server/ProxyConnection.java | 2 +-
.../offload/filesystem/impl/FileStoreBackedReadHandleImpl.java | 5 +++--
.../filesystem/impl/FileSystemManagedLedgerOffloader.java | 6 +++---
.../offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java | 4 ++--
.../offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java | 4 ++--
.../offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java | 10 +++++-----
21 files changed, 38 insertions(+), 38 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 9b618346e61..b6e7a7dde19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -287,7 +287,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
});
try {
- scheduler.submit(ModularLoadManagerImpl.this::updateAll);
+ scheduler.execute(ModularLoadManagerImpl.this::updateAll);
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5dadf1266e0..9951fb25e52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2096,7 +2096,7 @@ public class BrokerService implements Closeable {
bundles.getBundles().forEach(bundle -> {
pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist -> {
if (isExist) {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
try {
pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(),
bundle.getBundleRange());
@@ -2319,7 +2319,7 @@ public class BrokerService implements Closeable {
}
private void updateMaxPublishRatePerTopicInMessages() {
- this.pulsar().getExecutor().submit(() ->
+ this.pulsar().getExecutor().execute(() ->
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerPublishRate();
@@ -2329,7 +2329,7 @@ public class BrokerService implements Closeable {
}
private void updateSubscribeRate() {
- this.pulsar().getExecutor().submit(() ->
+ this.pulsar().getExecutor().execute(() ->
forEachTopic(topic -> {
if (topic instanceof PersistentTopic) {
((PersistentTopic) topic).updateBrokerSubscribeRate();
@@ -2392,7 +2392,7 @@ public class BrokerService implements Closeable {
}
private void updateSubscriptionMessageDispatchRate() {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic subscription
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
@@ -2409,7 +2409,7 @@ public class BrokerService implements Closeable {
}
private void updateReplicatorMessageDispatchRate() {
- this.pulsar().getExecutor().submit(() -> {
+ this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic Replicator in Geo-replication
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dbb5346fe66..2dd503616fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -329,7 +329,7 @@ public class PersistentSubscription implements Subscription {
// when topic closes: it iterates through concurrent-subscription map to close each subscription. so,
// topic.remove again try to access same map which creates deadlock. so, execute it in different thread.
- topic.getBrokerService().pulsar().getExecutor().submit(() ->{
+ topic.getBrokerService().pulsar().getExecutor().execute(() ->{
topic.removeSubscription(subName);
// Also need remove the cursor here, otherwise the data deletion will not work well.
// Because data deletion depends on the mark delete position of all cursors.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index e0eee02e957..d52955ccf0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -642,7 +642,7 @@ public class ClientCnx extends PulsarHandler {
if (firstOneWaiting != null) {
maxLookupRequestSemaphore.release();
// schedule a new lookup in.
- eventLoopGroup.submit(() -> {
+ eventLoopGroup.execute(() -> {
long newId = firstOneWaiting.getLeft();
TimedCompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
addPendingLookupRequests(newId, newFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 32bdb44ac8c..6cd02f2698f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1124,7 +1124,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
Throwable error,
CompletableFuture<Void> subscribeFuture) {
log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage());
- client.externalExecutorProvider().getExecutor().submit(() -> {
+ client.externalExecutorProvider().getExecutor().execute(() -> {
AtomicInteger toCloseNum = new AtomicInteger(0);
consumers.values().stream().filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
index 2fc5b720564..3e09bb64237 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/batch/BatchSourceExecutor.java
@@ -167,7 +167,7 @@ public class BatchSourceExecutor<T> implements Source<T> {
discoverInProgress = true;
}
// Run this code asynchronous so it doesn't block processing of the tasks
- discoveryThread.submit(() -> {
+ discoveryThread.execute(() -> {
try {
batchSource.discover(task -> taskEater(discoveredEvent, task));
} catch (Exception e) {
diff --git a/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
index 7a8b5fd6c2a..de4654e5f5f 100644
--- a/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
+++ b/pulsar-io/batch-data-generator/src/main/java/org/apache/pulsar/io/batchdatagenerator/BatchDataGeneratorPushSource.java
@@ -62,7 +62,7 @@ public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implem
public void prepare(byte[] instanceSplit) throws Exception {
log.info("Instance " + sourceContext.getInstanceId() + " got a new discovered task {}",
new String(instanceSplit, StandardCharsets.UTF_8));
- executor.submit(this);
+ executor.execute(this);
}
@Override
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
index 3754c0c7acb..fd4dc757bd3 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
@@ -297,7 +297,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
promise.completeExceptionally(ex);
}
};
- internalExecutorService.submit(responseCallable);
+ internalExecutorService.execute(responseCallable);
CompletableFuture<Void> listenerCalledPromise = new CompletableFuture();
diff --git a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
index c84e87b16a4..d8e6c92254e 100644
--- a/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
+++ b/pulsar-io/hbase/src/main/java/org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.java
@@ -120,7 +120,7 @@ public abstract class HbaseAbstractSink<T> implements Sink<T> {
}
if (number == batchSize) {
- flushExecutor.submit(() -> flush());
+ flushExecutor.execute(() -> flush());
}
}
diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
index 0c14b648c18..359c0909ca6 100644
--- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
+++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/BatchSink.java
@@ -62,7 +62,7 @@ public abstract class BatchSink<T, R> implements Sink<R> {
}
if (currentSize >= batchSize) {
- flushExecutor.submit(this::flush);
+ flushExecutor.execute(this::flush);
}
}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 3923ec4c8d2..18c630952d6 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -120,7 +120,7 @@ public class MongoSink implements Sink<byte[]> {
}
if (currentSize == mongoConfig.getBatchSize()) {
- flushExecutor.submit(() -> flush());
+ flushExecutor.execute(() -> flush());
}
}
diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
index 0c04891c421..29535e6cbe7 100644
--- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
+++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java
@@ -90,7 +90,7 @@ public class RedisSink implements Sink<byte[]> {
currentSize = incomingList.size();
}
if (currentSize == batchSize) {
- flushExecutor.submit(this::flush);
+ flushExecutor.execute(this::flush);
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
index 5e7fef285c5..cbfa68692ae 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
@@ -146,7 +146,7 @@ abstract class AbstractHierarchicalLedgerManager {
}
final T dataToProcess = data.get(next);
final AsyncCallback.VoidCallback stub = this;
- scheduler.submit(() -> processor.process(dataToProcess, stub));
+ scheduler.execute(() -> processor.process(dataToProcess, stub));
}
});
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
index fcb5ee285c8..91a938cc2e1 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
@@ -377,7 +377,7 @@ public class PulsarLedgerManager implements LedgerManager {
if (log.isDebugEnabled()) {
log.debug("Ledger metadata is changed for {} : {}.", ledgerId, result);
}
- scheduler.submit(() -> {
+ scheduler.execute(() -> {
synchronized (listenerSet) {
for (BookkeeperInternalCallbacks.LedgerMetadataListener listener : listenerSet) {
listener.onChanged(ledgerId, result);
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index 404ede1ae79..ab4f59fa507 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -105,12 +105,12 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
private final OpStatsLogger syncStats;
private final OpStatsLogger createClientStats;
- private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() {
+ private final Runnable clientCreator = new Runnable() {
@Override
- public ZooKeeper call() throws Exception {
+ public void run() {
try {
- return ZooWorker.syncCallWithRetries(null, new ZooWorker.ZooCallable<ZooKeeper>() {
+ ZooWorker.syncCallWithRetries(null, new ZooWorker.ZooCallable<ZooKeeper>() {
@Override
public ZooKeeper call() throws KeeperException, InterruptedException {
@@ -140,7 +140,6 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
} catch (Exception e) {
log.error("Gave up reconnecting to ZooKeeper : ", e);
Runtime.getRuntime().exit(-1);
- return null;
}
}
@@ -356,7 +355,7 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
log.info("ZooKeeper session {} is expired from {}.",
Long.toHexString(getSessionId()), connectString);
try {
- connectExecutor.submit(clientCreator);
+ connectExecutor.execute(clientCreator);
} catch (RejectedExecutionException ree) {
if (!closed.get()) {
log.error("ZooKeeper reconnect task is rejected : ", ree);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index c97416280c3..d3c83198773 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -405,7 +405,7 @@ public class ProxyConnection extends PulsarHandler {
public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnected connected) {
try {
final CommandConnected finalConnected = new CommandConnected().copyFrom(connected);
- ctx.executor().submit(() -> handleBrokerConnected(directProxyHandler, finalConnected));
+ ctx.executor().execute(() -> handleBrokerConnected(directProxyHandler, finalConnected));
} catch (RejectedExecutionException e) {
LOG.error("Event loop was already closed. Closing broker connection.", e);
directProxyHandler.close();
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
index ecc0cfc23b3..5d10b63de34 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -90,9 +90,10 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
- executor.submit(() -> {
+ executor.execute(() -> {
try {
reader.close();
+ promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
}
@@ -106,7 +107,7 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
- executor.submit(() -> {
+ executor.execute(() -> {
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index d91d84b476b..8e87c230adb 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -149,7 +149,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.chooseThread(readHandle.getId()).submit(
+ scheduler.chooseThread(readHandle.getId()).execute(
new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration,
assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(),
this.offloaderStats));
@@ -228,7 +228,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
semaphore.acquire();
countDownLatch = new CountDownLatch(1);
assignmentScheduler.chooseThread(ledgerId)
- .submit(FileSystemWriter.create(ledgerEntriesOnce,
+ .execute(FileSystemWriter.create(ledgerEntriesOnce,
dataWriter, semaphore, countDownLatch, haveOffloadEntryNumber, this));
needToOffloadFirstEntryNumber = end + 1;
} while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed()
@@ -339,7 +339,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String storagePath = getStoragePath(storageBasePath, ledgerName);
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
- scheduler.chooseThread(ledgerId).submit(() -> {
+ scheduler.chooseThread(ledgerId).execute(() -> {
try {
MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
configuration);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 998912a30c4..d99702e37c9 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -86,7 +86,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
- executor.submit(() -> {
+ executor.execute(() -> {
try {
index.close();
inputStream.close();
@@ -103,7 +103,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
- executor.submit(() -> {
+ executor.execute(() -> {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
boolean seeked = false;
try {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index e1a8296a20e..b896f38d390 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -115,7 +115,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
- executor.submit(() -> {
+ executor.execute(() -> {
try {
for (OffloadIndexBlockV2 indexBlock : indices) {
indexBlock.close();
@@ -141,7 +141,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
promise.completeExceptionally(new IllegalArgumentException());
return promise;
}
- executor.submit(() -> {
+ executor.execute(() -> {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
List<GroupedReader> groupedReaders = null;
try {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 1fdc0983b0e..606934b5f0e 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -181,7 +181,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata,
config.getBlobStoreLocation(), writeBlobStore);
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.chooseThread(readHandle.getId()).submit(() -> {
+ scheduler.chooseThread(readHandle.getId()).execute(() -> {
if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
promise.completeExceptionally(
new IllegalArgumentException("An empty or open ledger should never be offloaded"));
@@ -539,7 +539,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
- scheduler.chooseThread(ledgerId).submit(() -> {
+ scheduler.chooseThread(ledgerId).execute(() -> {
try {
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
readBlobstore,
@@ -573,7 +573,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
indexKeys.add(indexKey);
});
- scheduler.chooseThread(ledgerId).submit(() -> {
+ scheduler.chooseThread(ledgerId).execute(() -> {
try {
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
readBlobstore,
@@ -597,7 +597,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.chooseThread(ledgerId).submit(() -> {
+ scheduler.chooseThread(ledgerId).execute(() -> {
try {
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid),
@@ -623,7 +623,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
- scheduler.submit(() -> {
+ scheduler.execute(() -> {
try {
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(uid.toString(),