You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ar...@apache.org on 2021/03/29 16:17:56 UTC

[zookeeper] branch branch-3.6 updated: ZOOKEEPER-4260: Backport ZOOKEEPER-3575 to branch-3.6

This is an automated email from the ASF dual-hosted git repository.

arshad pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new 992abc8  ZOOKEEPER-4260: Backport ZOOKEEPER-3575 to branch-3.6
992abc8 is described below

commit 992abc8ce224183ef3d9e01909644f4cb489a9fc
Author: functioner <oi...@gmail.com>
AuthorDate: Mon Mar 29 21:48:56 2021 +0530

    ZOOKEEPER-4260: Backport ZOOKEEPER-3575 to branch-3.6
    
    (cherry picked from commit 7c1251dbcf6a314466024f71ae5757bde34bb3fd)
    
    Author: functioner <oi...@gmail.com>
    Author: Ayush Mantri <aa...@gmail.com>
    Author: Jie Huang <ji...@fb.com>
    
    Reviewers: Mohammad Arshad <ar...@apache.org>
    
    Closes #1653 from functioner/ZOOKEEPER-4260 and squashes the following commits:
    
    0a19eb18d [functioner] remove useless import
    5d827decd [Ayush Mantri] cherry-pick ZOOKEEPER-4257 and fix conflicts
    469d971d6 [functioner] Merge branch 'branch-3.6' of https://github.com/apache/zookeeper into ZOOKEEPER-4260
    e6cb76734 [Jie Huang] ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  6 ++
 .../apache/zookeeper/server/quorum/Learner.java    | 52 +++++++++++++-
 .../zookeeper/server/quorum/LearnerSender.java     | 82 ++++++++++++++++++++++
 .../apache/zookeeper/server/util/ConfigUtils.java  | 25 +++++++
 .../server/quorum/LearnerMetricsTest.java          | 34 +++++++++
 .../quorum/ReconfigDuringLeaderSyncTest.java       | 32 ++++++++-
 .../zookeeper/server/util/ConfigUtilsTest.java     | 42 +++++++++++
 7 files changed, 269 insertions(+), 4 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index d9e6ea5..2b78564 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1080,6 +1080,12 @@ property, when available, is noted below.
     effect due to TLS handshake timeout when there are too many in-flight TLS 
     handshakes. Set it to something like 250 is good enough to avoid herd effect.
 
+* *learner.asyncSending*
+  (Java system property: **zookeeper.learner.asyncSending**)
+  (Java system property: **learner.asyncSending**)(Added for backward compatibility)
+  **New in 3.7.0:**
+  The sending and receiving packets in Learner were done synchronously in a critical section. An untimely network issue could cause the followers to hang (see [ZOOKEEPER-3575](https://issues.apache.org/jira/browse/ZOOKEEPER-3575) and [ZOOKEEPER-4074](https://issues.apache.org/jira/browse/ZOOKEEPER-4074)). The new design moves sending packets in Learner to a separate thread and sends the packets asynchronously. The new design is enabled with this parameter (learner.asyncSending).
+  The default is false.
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index fa7cc59..d281acf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.TxnLogEntry;
 import org.apache.zookeeper.server.ZooTrace;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ConfigUtils;
 import org.apache.zookeeper.server.util.MessageTracker;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
@@ -95,6 +96,7 @@ public class Learner {
         return sock;
     }
 
+    LearnerSender sender = null;
     protected InputArchive leaderIs;
     protected OutputArchive leaderOs;
     /** the protocol version of the leader */
@@ -113,9 +115,13 @@ public class Learner {
 
     private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
 
+    public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
+    private static boolean asyncSending =
+        Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
     static {
         LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
         LOG.info("TCP NoDelay set to: {}", nodelay);
+        LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
     }
 
     final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
@@ -124,6 +130,15 @@ public class Learner {
         return pendingRevalidations.size();
     }
 
+    // for testing
+    protected static void setAsyncSending(boolean newMode) {
+        asyncSending = newMode;
+        LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+
+    }
+    protected static boolean getAsyncSending() {
+        return asyncSending;
+    }
     /**
      * validate a session for a client
      *
@@ -152,13 +167,27 @@ public class Learner {
     }
 
     /**
-     * write a packet to the leader
+     * write a packet to the leader.
+     *
+     * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
+     * When packets are sent synchronously, writing is done within a synchronization block.
+     * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
+     * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
+     * So we have only one thread writing to leaderOs at a time in either case.
      *
      * @param pp
      *                the proposal packet to be sent to the leader
      * @throws IOException
      */
     void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+        if (asyncSending) {
+            sender.queuePacket(pp);
+        } else {
+            writePacketNow(pp, flush);
+        }
+    }
+
+    void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
         synchronized (leaderOs) {
             if (pp != null) {
                 messageTracker.trackSent(pp.getType());
@@ -171,6 +200,14 @@ public class Learner {
     }
 
     /**
+     * Start thread that will forward any packet in the queue to the leader
+     */
+    protected void startSendingThread() {
+        sender = new LearnerSender(this);
+        sender.start();
+    }
+
+    /**
      * read a packet from the leader
      *
      * @param pp
@@ -305,6 +342,9 @@ public class Learner {
         leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
         bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
         leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+        if (asyncSending) {
+            startSendingThread();
+        }
     }
 
     class LeaderConnector implements Runnable {
@@ -788,8 +828,9 @@ public class Learner {
             dos.writeLong(entry.getKey());
             dos.writeInt(entry.getValue());
         }
-        qp.setData(bos.toByteArray());
-        writePacket(qp, true);
+
+        QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
+        writePacket(pingReply, true);
     }
 
     /**
@@ -799,6 +840,11 @@ public class Learner {
         self.setZooKeeperServer(null);
         self.closeAllConnections();
         self.adminServer.setZooKeeperServer(null);
+
+        if (sender != null) {
+            sender.shutdown();
+        }
+
         closeSocket();
         // shutdown previous zookeeper
         if (zk != null) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
new file mode 100644
index 0000000..cbb7d69
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.server.ZooKeeperCriticalThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LearnerSender extends ZooKeeperCriticalThread {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class);
+
+    private final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
+    private final QuorumPacket proposalOfDeath = new QuorumPacket();
+
+    Learner learner;
+
+    public LearnerSender(Learner learner) {
+        super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener());
+        this.learner = learner;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                QuorumPacket p = queuedPackets.poll();
+                if (p == null) {
+                    learner.bufferedOutput.flush();
+                    p = queuedPackets.take();
+                }
+
+                if (p == proposalOfDeath) {
+                    // Packet of death!
+                    break;
+                }
+
+                learner.messageTracker.trackSent(p.getType());
+                learner.leaderOs.writeRecord(p, "packet");
+            } catch (IOException e) {
+                handleException(this.getName(), e);
+                break;
+            } catch (InterruptedException e) {
+                handleException(this.getName(), e);
+                break;
+            }
+        }
+
+        LOG.info("LearnerSender exited");
+    }
+
+    public void queuePacket(QuorumPacket pp) throws IOException {
+        if (pp == null) {
+            learner.bufferedOutput.flush();
+        } else {
+            queuedPackets.add(pp);
+        }
+    }
+
+    public void shutdown() {
+        LOG.info("Shutting down LearnerSender");
+        queuedPackets.clear();
+        queuedPackets.add(proposalOfDeath);
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java
index 508dc11..d6f7572 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java
@@ -95,4 +95,29 @@ public class ConfigUtils {
         }
     }
 
+    /**
+     * Some old configuration properties are not configurable in zookeeper configuration file
+     * zoo.cfg. To make these properties configurable in zoo.cfg old properties are prepended
+     * with zookeeper. For example prop.x.y.z changed to zookeeper.prop.x.y.z. But for backward
+     * compatibility both prop.x.y.z and zookeeper.prop.x.y.z should be supported.
+     * This method first gets value from new property, if first property is not configured
+     * then gets value from old property
+     *
+     * @param newPropertyKey new property key which starts with zookeeper.
+     * @return either new or old system property value. Null if none of the properties are set.
+     */
+    public static String getPropertyBackwardCompatibleWay(String newPropertyKey) {
+        String newKeyValue = System.getProperty(newPropertyKey);
+        if (newKeyValue != null) {
+            return newKeyValue.trim();
+        }
+        String oldPropertyKey = newPropertyKey.replace("zookeeper.", "");
+        String oldKeyValue = System.getProperty(oldPropertyKey);
+
+        if (oldKeyValue != null) {
+            return oldKeyValue.trim();
+        }
+        return null;
+    }
+
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
index c6de02f..659ba31 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
@@ -20,6 +20,8 @@ package org.apache.zookeeper.server.quorum;
 
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.zookeeper.CreateMode;
@@ -31,14 +33,46 @@ import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.test.ClientBase;
 import org.hamcrest.Matcher;
 import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class LearnerMetricsTest extends QuorumPeerTestBase {
 
     private static final int TIMEOUT_SECONDS = 30;
     private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
     private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
     private ZooKeeper zk_client;
+    private boolean asyncSending;
+    private static boolean bakAsyncSending;
+
+    public LearnerMetricsTest(boolean asyncSending) {
+        this.asyncSending = asyncSending;
+    }
+
+    @Parameterized.Parameters
+    public static Collection sendingModes() {
+        return Arrays.asList(new Object[][]{{true}, {false}});
+    }
+
+    @Before
+    public void setAsyncSendingFlag() {
+        Learner.setAsyncSending(asyncSending);
+    }
+
+    @BeforeClass
+    public static void saveAsyncSendingFlag() {
+        bakAsyncSending = Learner.getAsyncSending();
+    }
+
+    @AfterClass
+    public static void resetAsyncSendingFlag() {
+        Learner.setAsyncSending(bakAsyncSending);
+    }
 
     @Test
     public void testLearnerMetricsTest() throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
index bff6cbf..ee51baf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
@@ -38,23 +40,51 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
     private static int SERVER_COUNT = 3;
     private MainThread[] mt;
+    private static boolean bakAsyncSending;
+
+    private boolean asyncSending;
+
+    public ReconfigDuringLeaderSyncTest(boolean asyncSending) {
+        this.asyncSending = asyncSending;
+    }
+
+    @Parameterized.Parameters
+    public static Collection sendingModes() {
+        return Arrays.asList(new Object[][]{{true}, {false}});
+    }
 
     @Before
     public void setup() {
         System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+        Learner.setAsyncSending(asyncSending);
         QuorumPeerConfig.setReconfigEnabled(true);
     }
 
+    @BeforeClass
+    public static void saveAsyncSendingFlag() {
+        bakAsyncSending = Learner.getAsyncSending();
+    }
+
+    @AfterClass
+    public static void resetAsyncSendingFlag() {
+        Learner.setAsyncSending(bakAsyncSending);
+    }
+
     /**
      * <pre>
      * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
index ba68b22..d259a5d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.junit.Test;
 
@@ -69,4 +70,45 @@ public class ConfigUtilsTest {
         assertEquals(nsa.length, 1);
     }
 
+    @Test
+    public void testGetPropertyBackwardCompatibleWay() throws ConfigException {
+        String newProp = "zookeeper.prop.x.y.z";
+        String oldProp = "prop.x.y.z";
+
+        // Null as both properties are not set
+        String result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertNull(result);
+
+        // Return old property value when only old property is set
+        String oldPropValue = "oldPropertyValue";
+        System.setProperty(oldProp, oldPropValue);
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(oldPropValue, result);
+
+        // Return new property value when both properties are set
+        String newPropValue = "newPropertyValue";
+        System.setProperty(newProp, newPropValue);
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(newPropValue, result);
+
+        // cleanUp
+        clearProp(newProp, oldProp);
+
+        // Return trimmed value
+        System.setProperty(oldProp, oldPropValue + "  ");
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(oldPropValue, result);
+
+        System.setProperty(newProp, "  " + newPropValue);
+        result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+        assertEquals(newPropValue, result);
+
+        // cleanUp
+        clearProp(newProp, oldProp);
+    }
+
+    private void clearProp(String newProp, String oldProp) {
+        System.clearProperty(newProp);
+        System.clearProperty(oldProp);
+    }
 }