You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fa...@apache.org on 2020/01/25 17:48:44 UTC
[zookeeper] branch master updated: ZOOKEEPER-3575: Moving sending
packets in Learner to a separate thread
This is an automated email from the ASF dual-hosted git repository.
fangmin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7c1251d ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread
7c1251d is described below
commit 7c1251dbcf6a314466024f71ae5757bde34bb3fd
Author: Jie Huang <ji...@fb.com>
AuthorDate: Sat Jan 25 09:48:36 2020 -0800
ZOOKEEPER-3575: Moving sending packets in Learner to a separate thread
Author: Jie Huang <ji...@fb.com>
Reviewers: eolivelli@apache.org, fangmin@apache.org
Closes #1116 from jhuan31/ZOOKEEPER-3575
---
.../apache/zookeeper/server/quorum/Learner.java | 50 ++++++++++++-
.../zookeeper/server/quorum/LearnerSender.java | 82 ++++++++++++++++++++++
.../server/quorum/LearnerMetricsTest.java | 34 +++++++++
.../quorum/ReconfigDuringLeaderSyncTest.java | 32 ++++++++-
4 files changed, 194 insertions(+), 4 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 2fd2db4..e34ab3c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -95,6 +95,7 @@ public class Learner {
return sock;
}
+ LearnerSender sender = null;
protected InputArchive leaderIs;
protected OutputArchive leaderOs;
/** the protocol version of the leader */
@@ -113,9 +114,12 @@ public class Learner {
private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
+ public static final String LEARNER_ASYNC_SENDING = "learner.asyncSending";
+ private static boolean asyncSending = Boolean.getBoolean(LEARNER_ASYNC_SENDING);
static {
LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
LOG.info("TCP NoDelay set to: {}", nodelay);
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
}
final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
@@ -124,6 +128,15 @@ public class Learner {
return pendingRevalidations.size();
}
+ // for testing
+ protected static void setAsyncSending(boolean newMode) {
+ asyncSending = newMode;
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+
+ }
+ protected static boolean getAsyncSending() {
+ return asyncSending;
+ }
/**
* validate a session for a client
*
@@ -152,13 +165,27 @@ public class Learner {
}
/**
- * write a packet to the leader
+ * write a packet to the leader.
+ *
+ * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
+ * When packets are sent synchronously, writing is done within a synchronization block.
+ * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
+ * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
+ * So we have only one thread writing to leaderOs at a time in either case.
*
* @param pp
* the proposal packet to be sent to the leader
* @throws IOException
*/
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+ if (asyncSending) {
+ sender.queuePacket(pp);
+ } else {
+ writePacketNow(pp, flush);
+ }
+ }
+
+ void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
messageTracker.trackSent(pp.getType());
@@ -171,6 +198,14 @@ public class Learner {
}
/**
+ * Start thread that will forward any packet in the queue to the leader
+ */
+ protected void startSendingThread() {
+ sender = new LearnerSender(this);
+ sender.start();
+ }
+
+ /**
* read a packet from the leader
*
* @param pp
@@ -303,6 +338,9 @@ public class Learner {
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+ if (asyncSending) {
+ startSendingThread();
+ }
}
class LeaderConnector implements Runnable {
@@ -774,8 +812,9 @@ public class Learner {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
- qp.setData(bos.toByteArray());
- writePacket(qp, true);
+
+ QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
+ writePacket(pingReply, true);
}
/**
@@ -785,6 +824,11 @@ public class Learner {
self.setZooKeeperServer(null);
self.closeAllConnections();
self.adminServer.setZooKeeperServer(null);
+
+ if (sender != null) {
+ sender.shutdown();
+ }
+
closeSocket();
// shutdown previous zookeeper
if (zk != null) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
new file mode 100644
index 0000000..cbb7d69
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSender.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.zookeeper.server.ZooKeeperCriticalThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LearnerSender extends ZooKeeperCriticalThread {
+ private static final Logger LOG = LoggerFactory.getLogger(LearnerSender.class);
+
+ private final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<>();
+ private final QuorumPacket proposalOfDeath = new QuorumPacket();
+
+ Learner learner;
+
+ public LearnerSender(Learner learner) {
+ super("LearnerSender:" + learner.zk.getServerId(), learner.zk.getZooKeeperServerListener());
+ this.learner = learner;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ QuorumPacket p = queuedPackets.poll();
+ if (p == null) {
+ learner.bufferedOutput.flush();
+ p = queuedPackets.take();
+ }
+
+ if (p == proposalOfDeath) {
+ // Packet of death!
+ break;
+ }
+
+ learner.messageTracker.trackSent(p.getType());
+ learner.leaderOs.writeRecord(p, "packet");
+ } catch (IOException e) {
+ handleException(this.getName(), e);
+ break;
+ } catch (InterruptedException e) {
+ handleException(this.getName(), e);
+ break;
+ }
+ }
+
+ LOG.info("LearnerSender exited");
+ }
+
+ public void queuePacket(QuorumPacket pp) throws IOException {
+ if (pp == null) {
+ learner.bufferedOutput.flush();
+ } else {
+ queuedPackets.add(pp);
+ }
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down LearnerSender");
+ queuedPackets.clear();
+ queuedPackets.add(proposalOfDeath);
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
index c6de02f..659ba31 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
@@ -20,6 +20,8 @@ package org.apache.zookeeper.server.quorum;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
@@ -31,14 +33,46 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.test.ClientBase;
import org.hamcrest.Matcher;
import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class LearnerMetricsTest extends QuorumPeerTestBase {
private static final int TIMEOUT_SECONDS = 30;
private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
private ZooKeeper zk_client;
+ private boolean asyncSending;
+ private static boolean bakAsyncSending;
+
+ public LearnerMetricsTest(boolean asyncSending) {
+ this.asyncSending = asyncSending;
+ }
+
+ @Parameterized.Parameters
+ public static Collection sendingModes() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
+
+ @Before
+ public void setAsyncSendingFlag() {
+ Learner.setAsyncSending(asyncSending);
+ }
+
+ @BeforeClass
+ public static void saveAsyncSendingFlag() {
+ bakAsyncSending = Learner.getAsyncSending();
+ }
+
+ @AfterClass
+ public static void resetAsyncSendingFlag() {
+ Learner.setAsyncSending(bakAsyncSending);
+ }
@Test
public void testLearnerMetricsTest() throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
index bff6cbf..ee51baf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
@@ -38,23 +40,51 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
- protected static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ReconfigDuringLeaderSyncTest.class);
private static int SERVER_COUNT = 3;
private MainThread[] mt;
+ private static boolean bakAsyncSending;
+
+ private boolean asyncSending;
+
+ public ReconfigDuringLeaderSyncTest(boolean asyncSending) {
+ this.asyncSending = asyncSending;
+ }
+
+ @Parameterized.Parameters
+ public static Collection sendingModes() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
@Before
public void setup() {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+ Learner.setAsyncSending(asyncSending);
QuorumPeerConfig.setReconfigEnabled(true);
}
+ @BeforeClass
+ public static void saveAsyncSendingFlag() {
+ bakAsyncSending = Learner.getAsyncSending();
+ }
+
+ @AfterClass
+ public static void resetAsyncSendingFlag() {
+ Learner.setAsyncSending(bakAsyncSending);
+ }
+
/**
* <pre>
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2172.