You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ar...@apache.org on 2021/03/29 16:17:56 UTC
[zookeeper] branch branch-3.6 updated: ZOOKEEPER-4260: Backport
ZOOKEEPER-3575 to branch-3.6
This is an automated email from the ASF dual-hosted git repository.
arshad pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.6 by this push:
new 992abc8 ZOOKEEPER-4260: Backport ZOOKEEPER-3575 to branch-3.6
992abc8 is described below
commit 992abc8ce224183ef3d9e01909644f4cb489a9fc
Author: functioner <oi...@gmail.com>
AuthorDate: Mon Mar 29 21:48:56 2021 +0530
ZOOKEEPER-4260: Backport ZOOKEEPER-3575 to branch-3.6
(cherry picked from commit 7c1251dbcf6a314466024f71ae5757bde34bb3fd)
Author: functioner <oi...@gmail.com>
Author: Ayush Mantri <aa...@gmail.com>
Author: Jie Huang <ji...@fb.com>
Reviewers: Mohammad Arshad <ar...@apache.org>
Closes #1653 from functioner/ZOOKEEPER-4260 and squashes the following commits:
0a19eb18d [functioner] remove useless import
5d827decd [Ayush Mantri] cherry-pick ZOOKEEPER-4257 and fix conflicts
469d971d6 [functioner] Merge branch 'branch-3.6' of https://github.com/apache/zookeeper into ZOOKEEPER-4260
e6cb76734 [Jie Huang] ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread
---
.../src/main/resources/markdown/zookeeperAdmin.md | 6 ++
.../apache/zookeeper/server/quorum/Learner.java | 52 +++++++++++++-
.../zookeeper/server/quorum/LearnerSender.java | 82 ++++++++++++++++++++++
.../apache/zookeeper/server/util/ConfigUtils.java | 25 +++++++
.../server/quorum/LearnerMetricsTest.java | 34 +++++++++
.../quorum/ReconfigDuringLeaderSyncTest.java | 32 ++++++++-
.../zookeeper/server/util/ConfigUtilsTest.java | 42 +++++++++++
7 files changed, 269 insertions(+), 4 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index d9e6ea5..2b78564 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1080,6 +1080,12 @@ property, when available, is noted below.
effect due to TLS handshake timeout when there are too many in-flight TLS
handshakes. Set it to something like 250 is good enough to avoid herd effect.
+* *learner.asyncSending*
+ (Java system property: **zookeeper.learner.asyncSending**)
+ (Java system property: **learner.asyncSending**)(Added for backward compatibility)
+ **New in 3.7.0:**
+ The sending and receiving packets in Learner were done synchronously in a critical section. An untimely network issue could cause the followers to hang (see [ZOOKEEPER-3575](https://issues.apache.org/jira/browse/ZOOKEEPER-3575) and [ZOOKEEPER-4074](https://issues.apache.org/jira/browse/ZOOKEEPER-4074)). The new design moves sending packets in Learner to a separate thread and sends the packets asynchronously. The new design is enabled with this parameter (learner.asyncSending).
+ The default is false.
<a name="sc_clusterOptions"></a>
#### Cluster Options
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index fa7cc59..d281acf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.TxnLogEntry;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
@@ -95,6 +96,7 @@ public class Learner {
return sock;
}
+ LearnerSender sender = null;
protected InputArchive leaderIs;
protected OutputArchive leaderOs;
/** the protocol version of the leader */
@@ -113,9 +115,13 @@ public class Learner {
private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
+ public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
+ private static boolean asyncSending =
+ Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
static {
LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
LOG.info("TCP NoDelay set to: {}", nodelay);
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
}
final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
@@ -124,6 +130,15 @@ public class Learner {
return pendingRevalidations.size();
}
+ // for testing
+ protected static void setAsyncSending(boolean newMode) {
+ asyncSending = newMode;
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+
+ }
+ protected static boolean getAsyncSending() {
+ return asyncSending;
+ }
/**
* validate a session for a client
*
@@ -152,13 +167,27 @@ public class Learner {
}
/**
- * write a packet to the leader
+ * write a packet to the leader.
+ *
+ * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
+ * When packets are sent synchronously, writing is done within a synchronization block.
+ * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
+ * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
+ * So we have only one thread writing to leaderOs at a time in either case.
*
* @param pp
* the proposal packet to be sent to the leader
* @throws IOException
*/
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+ if (asyncSending) {
+ sender.queuePacket(pp);
+ } else {
+ writePacketNow(pp, flush);
+ }
+ }
+
+ void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
messageTracker.trackSent(pp.getType());
@@ -171,6 +200,14 @@ public class Learner {
}
/**
+ * Start thread that will forward any packet in the queue to the leader
+ */
+ protected void startSendingThread() {
+ sender = new LearnerSender(this);
+ sender.start();
+ }
+
+ /**
* read a packet from the leader
*
* @param pp
@@ -305,6 +342,9 @@ public class Learner {
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+ if (asyncSending) {
+ startSendingThread();
+ }
}
class LeaderConnector implements Runnable {
@@ -788,8 +828,9 @@ public class Learner {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
- qp.setData(bos.toByteArray());
- writePacket(qp, true);
+
+ QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
+ writePacket(pingReply, true);
}
/**
@@ -799,6 +840,11 @@ public class Learner {
self.setZooKeeperServer(null);
self.closeAllConnections();
self.adminServer.setZooKeeperServer(null);
+
+ if (sender != null) {
+ sender.shutdown();
+ }
+
closeSocket();
// shutdown previous zookeeper
if (zk != null) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
new file mode 100644
index 0000000..cbb7d69
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.server.ZooKeeperCriticalThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LearnerSender extends ZooKeeperCriticalThread {
+ private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class);
+
+ private final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
+ private final QuorumPacket proposalOfDeath = new QuorumPacket();
+
+ Learner learner;
+
+ public LearnerSender(Learner learner) {
+ super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener());
+ this.learner = learner;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ QuorumPacket p = queuedPackets.poll();
+ if (p == null) {
+ learner.bufferedOutput.flush();
+ p = queuedPackets.take();
+ }
+
+ if (p == proposalOfDeath) {
+ // Packet of death!
+ break;
+ }
+
+ learner.messageTracker.trackSent(p.getType());
+ learner.leaderOs.writeRecord(p, "packet");
+ } catch (IOException e) {
+ handleException(this.getName(), e);
+ break;
+ } catch (InterruptedException e) {
+ handleException(this.getName(), e);
+ break;
+ }
+ }
+
+ LOG.info("LearnerSender exited");
+ }
+
+ public void queuePacket(QuorumPacket pp) throws IOException {
+ if (pp == null) {
+ learner.bufferedOutput.flush();
+ } else {
+ queuedPackets.add(pp);
+ }
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down LearnerSender");
+ queuedPackets.clear();
+ queuedPackets.add(proposalOfDeath);
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java
index 508dc11..d6f7572 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ConfigUtils.java
@@ -95,4 +95,29 @@ public class ConfigUtils {
}
}
+ /**
+ * Some old configuration properties are not configurable in zookeeper configuration file
+ * zoo.cfg. To make these properties configurable in zoo.cfg old properties are prepended
+ * with zookeeper. For example prop.x.y.z changed to zookeeper.prop.x.y.z. But for backward
+ * compatibility both prop.x.y.z and zookeeper.prop.x.y.z should be supported.
+ * This method first gets value from new property, if first property is not configured
+ * then gets value from old property
+ *
+ * @param newPropertyKey new property key which starts with zookeeper.
+ * @return either new or old system property value. Null if none of the properties are set.
+ */
+ public static String getPropertyBackwardCompatibleWay(String newPropertyKey) {
+ String newKeyValue = System.getProperty(newPropertyKey);
+ if (newKeyValue != null) {
+ return newKeyValue.trim();
+ }
+ String oldPropertyKey = newPropertyKey.replace("zookeeper.", "");
+ String oldKeyValue = System.getProperty(oldPropertyKey);
+
+ if (oldKeyValue != null) {
+ return oldKeyValue.trim();
+ }
+ return null;
+ }
+
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
index c6de02f..659ba31 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
@@ -20,6 +20,8 @@ package org.apache.zookeeper.server.quorum;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
@@ -31,14 +33,46 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.test.ClientBase;
import org.hamcrest.Matcher;
import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class LearnerMetricsTest extends QuorumPeerTestBase {
private static final int TIMEOUT_SECONDS = 30;
private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
private ZooKeeper zk_client;
+ private boolean asyncSending;
+ private static boolean bakAsyncSending;
+
+ public LearnerMetricsTest(boolean asyncSending) {
+ this.asyncSending = asyncSending;
+ }
+
+ @Parameterized.Parameters
+ public static Collection sendingModes() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
+
+ @Before
+ public void setAsyncSendingFlag() {
+ Learner.setAsyncSending(asyncSending);
+ }
+
+ @BeforeClass
+ public static void saveAsyncSendingFlag() {
+ bakAsyncSending = Learner.getAsyncSending();
+ }
+
+ @AfterClass
+ public static void resetAsyncSendingFlag() {
+ Learner.setAsyncSending(bakAsyncSending);
+ }
@Test
public void testLearnerMetricsTest() throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
index bff6cbf..ee51baf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
@@ -38,23 +40,51 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
- protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static int SERVER_COUNT = 3;
private MainThread[] mt;
+ private static boolean bakAsyncSending;
+
+ private boolean asyncSending;
+
+ public ReconfigDuringLeaderSyncTest(boolean asyncSending) {
+ this.asyncSending = asyncSending;
+ }
+
+ @Parameterized.Parameters
+ public static Collection sendingModes() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
@Before
public void setup() {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+ Learner.setAsyncSending(asyncSending);
QuorumPeerConfig.setReconfigEnabled(true);
}
+ @BeforeClass
+ public static void saveAsyncSendingFlag() {
+ bakAsyncSending = Learner.getAsyncSending();
+ }
+
+ @AfterClass
+ public static void resetAsyncSendingFlag() {
+ Learner.setAsyncSending(bakAsyncSending);
+ }
+
/**
* <pre>
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
index ba68b22..d259a5d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/ConfigUtilsTest.java
@@ -19,6 +19,7 @@
package org.apache.zookeeper.server.util;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.junit.Test;
@@ -69,4 +70,45 @@ public class ConfigUtilsTest {
assertEquals(nsa.length, 1);
}
+ @Test
+ public void testGetPropertyBackwardCompatibleWay() throws ConfigException {
+ String newProp = "zookeeper.prop.x.y.z";
+ String oldProp = "prop.x.y.z";
+
+ // Null as both properties are not set
+ String result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+ assertNull(result);
+
+ // Return old property value when only old property is set
+ String oldPropValue = "oldPropertyValue";
+ System.setProperty(oldProp, oldPropValue);
+ result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+ assertEquals(oldPropValue, result);
+
+ // Return new property value when both properties are set
+ String newPropValue = "newPropertyValue";
+ System.setProperty(newProp, newPropValue);
+ result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+ assertEquals(newPropValue, result);
+
+ // cleanUp
+ clearProp(newProp, oldProp);
+
+ // Return trimmed value
+ System.setProperty(oldProp, oldPropValue + " ");
+ result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+ assertEquals(oldPropValue, result);
+
+ System.setProperty(newProp, " " + newPropValue);
+ result = ConfigUtils.getPropertyBackwardCompatibleWay(newProp);
+ assertEquals(newPropValue, result);
+
+ // cleanUp
+ clearProp(newProp, oldProp);
+ }
+
+ private void clearProp(String newProp, String oldProp) {
+ System.clearProperty(newProp);
+ System.clearProperty(oldProp);
+ }
}