You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/09/21 11:09:57 UTC

[bookkeeper] 04/07: Fix the deadlock when only using io thread to handle request (#3480)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit b876ebc353e6d658ede2903e45b23bb0df7d4210
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Sep 12 18:49:32 2022 +0800

    Fix the deadlock when only using io thread to handle request (#3480)
    
    * Fix the deadlock when only using io thread to handle request
    ---
    
    *Motivation*
    
    If user don't configure the ReadWorker thread pool, the reqeust will
    process with io thread. We cannot call await() from an IO thread,
    if the socket buffer is full, that blocking call would cause a deadlock.
    
    *Modification*
    
    - only wait the promise when the thread is not io thread
    
    * Fix the style issue
    
    (cherry picked from commit 70bbc3830f07691cbdb87fe03958196fafc0effb)
---
 .../bookkeeper/proto/PacketProcessorBase.java      |  8 ++-
 .../client/BookieRecoveryUseIOThreadTest.java      | 77 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 7121579862..07954d746a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -18,9 +18,8 @@
 package org.apache.bookkeeper.proto;
 
 import io.netty.channel.Channel;
-
+import io.netty.channel.ChannelFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.proto.BookieProtocol.Request;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
@@ -138,7 +137,10 @@ abstract class PacketProcessorBase<T extends Request> extends SafeRunnable {
      */
     protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) {
         try {
-            channel.writeAndFlush(response).await();
+            ChannelFuture future = channel.writeAndFlush(response);
+            if (!channel.eventLoop().inEventLoop()) {
+                future.await();
+            }
         } catch (InterruptedException e) {
             return;
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryUseIOThreadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryUseIOThreadTest.java
new file mode 100644
index 0000000000..fa1dba0cda
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryUseIOThreadTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BookieRecoveryUseIOThreadTest extends BookKeeperClusterTestCase {
+
+    public BookieRecoveryUseIOThreadTest() {
+        super(1);
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        baseConf.setNumAddWorkerThreads(0);
+        baseConf.setNumReadWorkerThreads(0);
+        baseConf.setNumHighPriorityWorkerThreads(0);
+        super.setUp();
+    }
+
+    @Test
+    public void testRecoveryClosedLedger() throws BKException, IOException, InterruptedException {
+        // test the v2 protocol when using IO thread to handle the request
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        conf.setUseV2WireProtocol(true);
+        AtomicInteger finalRc = new AtomicInteger(Integer.MAX_VALUE);
+        CountDownLatch latch = new CountDownLatch(1);
+        try (BookKeeper bkc = new BookKeeper(conf)) {
+            bkc.asyncCreateLedger(1, 1, BookKeeper.DigestType.CRC32, "".getBytes(),
+                new AsyncCallback.CreateCallback() {
+                    @Override
+                    public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+                        lh.asyncAddEntry("hello".getBytes(), new AsyncCallback.AddCallback() {
+                            @Override
+                            public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                                if (rc == BKException.Code.OK) {
+                                    bkc.asyncOpenLedger(lh.ledgerId, BookKeeper.DigestType.CRC32, "".getBytes(),
+                                        new AsyncCallback.OpenCallback() {
+                                            @Override
+                                            public void openComplete(int rc, LedgerHandle lh, Object ctx) {
+                                                finalRc.set(rc);
+                                                latch.countDown();
+                                            }
+                                        }, null);
+                                }
+                            }
+                        }, null);
+                    }
+                }, null);
+            latch.await();
+        }
+        Assert.assertEquals(finalRc.get(), org.apache.bookkeeper.client.api.BKException.Code.OK);
+    }
+}