You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fa...@apache.org on 2020/01/25 17:48:44 UTC

[zookeeper] branch master updated: ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread

This is an automated email from the ASF dual-hosted git repository.

fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c1251d  ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread
7c1251d is described below

commit 7c1251dbcf6a314466024f71ae5757bde34bb3fd
Author: Jie Huang <ji...@fb.com>
AuthorDate: Sat Jan 25 09:48:36 2020 -0800

    ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread
    
    Author: Jie Huang <ji...@fb.com>
    
    Reviewers: eolivelli@apache.org, fangmin@apache.org
    
    Closes #1116 from jhuan31/ZOOKEEPER-3575
---
 .../apache/zookeeper/server/quorum/Learner.java    | 50 ++++++++++++-
 .../zookeeper/server/quorum/LearnerSender.java     | 82 ++++++++++++++++++++++
 .../server/quorum/LearnerMetricsTest.java          | 34 +++++++++
 .../quorum/ReconfigDuringLeaderSyncTest.java       | 32 ++++++++-
 4 files changed, 194 insertions(+), 4 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 2fd2db4..e34ab3c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -95,6 +95,7 @@ public class Learner {
         return sock;
     }
 
+    LearnerSender sender = null;
     protected InputArchive leaderIs;
     protected OutputArchive leaderOs;
     /** the protocol version of the leader */
@@ -113,9 +114,12 @@ public class Learner {
 
     private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
 
+    public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending";
+    private static boolean asyncSending = Boolean.getBoolean(LEARNER_ASYNC_SENDING);
     static {
         LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
         LOG.info("TCP NoDelay set to: {}", nodelay);
+        LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
     }
 
     final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
@@ -124,6 +128,15 @@ public class Learner {
         return pendingRevalidations.size();
     }
 
+    // for testing
+    protected static void setAsyncSending(boolean newMode) {
+        asyncSending = newMode;
+        LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+
+    }
+    protected static boolean getAsyncSending() {
+        return asyncSending;
+    }
     /**
      * validate a session for a client
      *
@@ -152,13 +165,27 @@ public class Learner {
     }
 
     /**
-     * write a packet to the leader
+     * write a packet to the leader.
+     *
+     * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
+     * When packets are sent synchronously, writing is done within a synchronization block.
+     * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
+     * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
+     * So we have only one thread writing to leaderOs at a time in either case.
      *
      * @param pp
      *                the proposal packet to be sent to the leader
      * @throws IOException
      */
     void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+        if (asyncSending) {
+            sender.queuePacket(pp);
+        } else {
+            writePacketNow(pp, flush);
+        }
+    }
+
+    void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
         synchronized (leaderOs) {
             if (pp != null) {
                 messageTracker.trackSent(pp.getType());
@@ -171,6 +198,14 @@ public class Learner {
     }
 
     /**
+     * Start thread that will forward any packet in the queue to the leader
+     */
+    protected void startSendingThread() {
+        sender = new LearnerSender(this);
+        sender.start();
+    }
+
+    /**
      * read a packet from the leader
      *
      * @param pp
@@ -303,6 +338,9 @@ public class Learner {
         leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
         bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
         leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+        if (asyncSending) {
+            startSendingThread();
+        }
     }
 
     class LeaderConnector implements Runnable {
@@ -774,8 +812,9 @@ public class Learner {
             dos.writeLong(entry.getKey());
             dos.writeInt(entry.getValue());
         }
-        qp.setData(bos.toByteArray());
-        writePacket(qp, true);
+
+        QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
+        writePacket(pingReply, true);
     }
 
     /**
@@ -785,6 +824,11 @@ public class Learner {
         self.setZooKeeperServer(null);
         self.closeAllConnections();
         self.adminServer.setZooKeeperServer(null);
+
+        if (sender != null) {
+            sender.shutdown();
+        }
+
         closeSocket();
         // shutdown previous zookeeper
         if (zk != null) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
new file mode 100644
index 0000000..cbb7d69
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.server.ZooKeeperCriticalThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LearnerSender extends ZooKeeperCriticalThread {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class);
+
+    private final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
+    private final QuorumPacket proposalOfDeath = new QuorumPacket();
+
+    Learner learner;
+
+    public LearnerSender(Learner learner) {
+        super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener());
+        this.learner = learner;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                QuorumPacket p = queuedPackets.poll();
+                if (p == null) {
+                    learner.bufferedOutput.flush();
+                    p = queuedPackets.take();
+                }
+
+                if (p == proposalOfDeath) {
+                    // Packet of death!
+                    break;
+                }
+
+                learner.messageTracker.trackSent(p.getType());
+                learner.leaderOs.writeRecord(p, "packet");
+            } catch (IOException e) {
+                handleException(this.getName(), e);
+                break;
+            } catch (InterruptedException e) {
+                handleException(this.getName(), e);
+                break;
+            }
+        }
+
+        LOG.info("LearnerSender exited");
+    }
+
+    public void queuePacket(QuorumPacket pp) throws IOException {
+        if (pp == null) {
+            learner.bufferedOutput.flush();
+        } else {
+            queuedPackets.add(pp);
+        }
+    }
+
+    public void shutdown() {
+        LOG.info("Shutting down LearnerSender");
+        queuedPackets.clear();
+        queuedPackets.add(proposalOfDeath);
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
index c6de02f..659ba31 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
@@ -20,6 +20,8 @@ package org.apache.zookeeper.server.quorum;
 
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.zookeeper.CreateMode;
@@ -31,14 +33,46 @@ import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.test.ClientBase;
 import org.hamcrest.Matcher;
 import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class LearnerMetricsTest extends QuorumPeerTestBase {
 
     private static final int TIMEOUT_SECONDS = 30;
     private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
     private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
     private ZooKeeper zk_client;
+    private boolean asyncSending;
+    private static boolean bakAsyncSending;
+
+    public LearnerMetricsTest(boolean asyncSending) {
+        this.asyncSending = asyncSending;
+    }
+
+    @Parameterized.Parameters
+    public static Collection sendingModes() {
+        return Arrays.asList(new Object[][]{{true}, {false}});
+    }
+
+    @Before
+    public void setAsyncSendingFlag() {
+        Learner.setAsyncSending(asyncSending);
+    }
+
+    @BeforeClass
+    public static void saveAsyncSendingFlag() {
+        bakAsyncSending = Learner.getAsyncSending();
+    }
+
+    @AfterClass
+    public static void resetAsyncSendingFlag() {
+        Learner.setAsyncSending(bakAsyncSending);
+    }
 
     @Test
     public void testLearnerMetricsTest() throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
index bff6cbf..ee51baf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
@@ -38,23 +40,51 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
     private static int SERVER_COUNT = 3;
     private MainThread[] mt;
+    private static boolean bakAsyncSending;
+
+    private boolean asyncSending;
+
+    public ReconfigDuringLeaderSyncTest(boolean asyncSending) {
+        this.asyncSending = asyncSending;
+    }
+
+    @Parameterized.Parameters
+    public static Collection sendingModes() {
+        return Arrays.asList(new Object[][]{{true}, {false}});
+    }
 
     @Before
     public void setup() {
         System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+        Learner.setAsyncSending(asyncSending);
         QuorumPeerConfig.setReconfigEnabled(true);
     }
 
+    @BeforeClass
+    public static void saveAsyncSendingFlag() {
+        bakAsyncSending = Learner.getAsyncSending();
+    }
+
+    @AfterClass
+    public static void resetAsyncSendingFlag() {
+        Learner.setAsyncSending(bakAsyncSending);
+    }
+
     /**
      * <pre>
      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.