You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/17 09:03:23 UTC

[GitHub] [pulsar] nodece edited a comment on pull request #14320: [Broker] Increase default numHttpServerThreads value to 200 to prevent Admin API unavailability

nodece edited a comment on pull request #14320:
URL: https://github.com/apache/pulsar/pull/14320#issuecomment-1042715597


   The unavailability of the Admin API is not caused by the HTTP server thread, the root cause is that the ZK callback thread is blocked. 
   
   When an admin API calls the ZK metadatastore API, it gets the ZK data by call the `CompletableFuture`, note that we did not use the executor to execute the `CompletableFuture#complete()` in [ZKMetadataStore.java#L171](https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java#L171). In ZK callback thread, once the caller converts async to sync calls then the ZK callback thread will be blocked, this code so like: `metadata.getAsync().get(30, TimeUnit.SECONDS)`.
   
   How to solve this problem?
   1. Use an executor to execute the callback that passes data to Pulsar in ZK callback
   2. Don't convert async to sync calls
   
   
   How to reproduce the ZK callback thread is blocked:
   ```
   docker run -d -p 2181:2181 --name test-zookeeper zookeeper
   ```
   ```java
   public class Main {
       private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
   
       public static void printThread(String name) {
           System.out.println(name + " thread name -> " + Thread.currentThread().getName());
       }
   
       public static void main(String[] args) throws Exception {
           ZooKeeper zkc = new ZooKeeper("localhost:2181", 60_000, null);
   
           System.out.println("Check the zk connect");
           CountDownLatch zkLatch = new CountDownLatch(1);
           new Thread(() -> {
               while (true) {
                   if (zkc.getState().isConnected()) {
                       zkLatch.countDown();
                       break;
                   }
               }
           }).start();
           if (!zkLatch.await(5, TimeUnit.SECONDS)) {
               throw new Exception("zk connect failed");
           }
   
           AsyncLoadingCache<String, byte[]> objCache = Caffeine.newBuilder()
                   .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS)
                   .buildAsync((key, executor) -> {
                       CompletableFuture<byte[]> future = new CompletableFuture<>();
                       zkc.multi(Lists.newArrayList(Op.getData("/")), (rc, path, ctx, opResults) -> {
                           printThread("zk callback");
                           future.complete(null);
                       }, null);
                       return future;
                   });
   
           CountDownLatch countDownLatch = new CountDownLatch(1);
   
           // Reproduce the ZK callback is blocked
           System.out.println("async get start");
           objCache.get("/").whenComplete((unused, ignored) -> {
               printThread("async get done");
               try {
                   System.out.println("zk thread will blocked after sync get");
                   System.out.println("sync get start");
                   objCache.get("/1").get(5, TimeUnit.SECONDS);
                   printThread("sync get done");
                   countDownLatch.countDown();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
                   countDownLatch.countDown();
               }
           });
   
           countDownLatch.await();
       }
   }
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org