You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/06/21 06:47:05 UTC

[GitHub] [bookkeeper] eolivelli commented on a diff in pull request #3348: Fix the queue size in writeThreadPool exceeds the configured size

eolivelli commented on code in PR #3348:
URL: https://github.com/apache/bookkeeper/pull/3348#discussion_r902219872


##########
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java:
##########
@@ -59,78 +59,102 @@ protected ListeningExecutorService delegate() {
 
     @Override
     public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.schedule(command, delay, unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.schedule(command, delay, unit);
+        }
     }
 
     @Override
     public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.schedule(callable, delay, unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.schedule(callable, delay, unit);
+        }
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                             long initialDelay, long period, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit);
+        }
     }
 
     @Override
     public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                                long initialDelay, long delay, TimeUnit unit) {
-        this.checkQueue(1);
-        return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit);
+        synchronized (this) {
+            this.checkQueue(1);
+            return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit);
+        }
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Callable<T> task) {
-        this.checkQueue(1);
-        return super.submit(task);
+        synchronized (this) {
+            this.checkQueue(1);
+            return super.submit(task);
+        }
     }
 
     @Override
     public ListenableFuture<?> submit(Runnable task) {
-        this.checkQueue(1);
-        return super.submit(task);
+        synchronized (this) {
+            this.checkQueue(1);
+            return super.submit(task);
+        }
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
-        this.checkQueue(tasks.size());
-        return super.invokeAll(tasks);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAll(tasks);
+        }
     }
 
     @Override
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                          long timeout, TimeUnit unit) throws InterruptedException {
-        this.checkQueue(tasks.size());
-        return super.invokeAll(tasks, timeout, unit);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAll(tasks, timeout, unit);
+        }
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
-        this.checkQueue(tasks.size());
-        return super.invokeAny(tasks);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAny(tasks);
+        }
     }
 
     @Override
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
                            TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        this.checkQueue(tasks.size());
-        return super.invokeAny(tasks, timeout, unit);
+        synchronized (this) {
+            this.checkQueue(tasks.size());
+            return super.invokeAny(tasks, timeout, unit);
+        }
     }
 
     @Override
     public <T> ListenableFuture<T> submit(Runnable task, T result) {
-        this.checkQueue(1);
-        return super.submit(task, result);
+        synchronized (this) {
+            this.checkQueue(1);
+            return super.submit(task, result);
+        }
     }
 
     @Override
     public void execute(Runnable command) {
-        this.checkQueue(1);
-        super.execute(command);
+        synchronized (this) {

Review Comment:
   a style note..... you can push `synchronized` to the method signature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org