You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2019/09/05 21:13:02 UTC
[zookeeper] branch master updated: ZOOKEEPER-3492: Add weights to
server side connection throttling
This is an automated email from the ASF dual-hosted git repository.
hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new eecd9e7 ZOOKEEPER-3492: Add weights to server side connection throttling
eecd9e7 is described below
commit eecd9e7ce046083bd40cd6134bdb2b405d01fe67
Author: Jie Huang <ji...@fb.com>
AuthorDate: Thu Sep 5 14:12:50 2019 -0700
ZOOKEEPER-3492: Add weights to server side connection throttling
Author: Jie Huang <ji...@fb.com>
Reviewers: Michael Han <ha...@apache.org>, Enrico Olivelli <eo...@gmail.com>
Closes #1037 from jhuan31/ZOOKEEPER-3492
---
.../src/main/resources/markdown/zookeeperAdmin.md | 33 ++++-
.../org/apache/zookeeper/server/BlueThrottle.java | 119 ++++++++++++++++--
.../apache/zookeeper/server/SessionTracker.java | 1 +
.../zookeeper/server/SessionTrackerImpl.java | 4 +
.../apache/zookeeper/server/ZooKeeperServer.java | 34 ++++--
.../server/quorum/LeaderSessionTracker.java | 1 -
.../server/quorum/UpgradeableSessionTracker.java | 5 +
.../apache/zookeeper/server/BlueThrottleTest.java | 134 +++++++++++++++++++++
.../zookeeper/server/PrepRequestProcessorTest.java | 4 +
9 files changed, 311 insertions(+), 24 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index c4cc747..5113eaa 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -124,8 +124,8 @@ is no full support.
#### Required Software
-ZooKeeper runs in Java, release 1.8 or greater
-(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported).
+ZooKeeper runs in Java, release 1.8 or greater
+(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported).
It runs as an _ensemble_ of ZooKeeper servers. Three
ZooKeeper servers is the minimum recommended size for an
ensemble, and we also recommend that they run on separate
@@ -822,6 +822,27 @@ property, when available, is noted below.
dropping. This parameter defines the threshold to decrease the dropping
probability. The default is 0.
+* *zookeeper.connection_throttle_weight_enabled* :
+ (Java system property only)
+ **New in 3.6.0:**
+ Whether to consider connection weights when throttling. Only useful when connection throttle is enabled, that is, connectionMaxTokens is larger than 0. The default is false.
+
+* *zookeeper.connection_throttle_global_session_weight* :
+ (Java system property only)
+ **New in 3.6.0:**
+ The weight of a global session. It is the number of tokens required for a global session request to get through the connection throttler. It has to be a positive integer no smaller than the weight of a local session. The default is 3.
+
+* *zookeeper.connection_throttle_local_session_weight* :
+ (Java system property only)
+ **New in 3.6.0:**
+ The weight of a local session. It is the number of tokens required for a local session request to get through the connection throttler. It has to be a positive integer no larger than the weight of a global session or a renew session. The default is 1.
+
+* *zookeeper.connection_throttle_renew_session_weight* :
+ (Java system property only)
+ **New in 3.6.0:**
+ The weight of renewing a session. It is also the number of tokens required for a reconnect request to get through the throttler. It has to be a positive integer no smaller than the weight of a local session. The default is 2.
+
+
* *clientPortListenBacklog* :
**New in 3.4.14, 3.5.5, 3.6.0:**
The socket backlog length for the ZooKeeper server socket. This controls
@@ -889,7 +910,7 @@ property, when available, is noted below.
* *advancedFlowControlEnabled* :
(Java system property: **zookeeper.netty.advancedFlowControl.enabled**)
- Using accurate flow control in netty based on the status of ZooKeeper
+ Using accurate flow control in netty based on the status of ZooKeeper
pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
Netty.
@@ -958,9 +979,9 @@ of servers -- that is, when deploying clusters of servers.
* *connectToLearnerMasterLimit* :
(Java system property: zookeeper.**connectToLearnerMasterLimit**)
Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
- connect to the leader after leader election. Defaults to the value of initLimit.
+ connect to the leader after leader election. Defaults to the value of initLimit.
Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
-
+
* *leaderServes* :
(Java system property: zookeeper.**leaderServes**)
Leader accepts client connections. Default value is "yes".
@@ -1568,7 +1589,7 @@ options are used to configure the [AdminServer](#sc_adminserver).
### Metrics Providers
-**New in 3.6.0:** The following options are used to configure metrics.
+**New in 3.6.0:** The following options are used to configure metrics.
By default ZooKeeper server exposes useful metrics using the [AdminServer](#sc_adminserver).
and [Four Letter Words](#sc_4lw) interface.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
index 3895c2e..9f03e44 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
@@ -20,6 +20,8 @@ package org.apache.zookeeper.server;
import java.util.Random;
import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements a token-bucket based rate limiting mechanism with optional
@@ -69,6 +71,7 @@ import org.apache.zookeeper.common.Time;
**/
public class BlueThrottle {
+ private static final Logger LOG = LoggerFactory.getLogger(BlueThrottle.class);
private int maxTokens;
private int fillTime;
@@ -86,35 +89,115 @@ public class BlueThrottle {
Random rng;
public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens";
- public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
+ private static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time";
- public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+ private static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count";
- public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+ private static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time";
- public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+ private static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase";
- public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+ private static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease";
- public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+ private static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio";
- public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+ private static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+
+ public static final String WEIGHED_CONNECTION_THROTTLE = "zookeeper.connection_throttle_weight_enabled";
+ private static boolean connectionWeightEnabled;
+
+ public static final String GLOBAL_SESSION_WEIGHT = "zookeeper.connection_throttle_global_session_weight";
+ private static final int DEFAULT_GLOBAL_SESSION_WEIGHT;
+
+ public static final String LOCAL_SESSION_WEIGHT = "zookeeper.connection_throttle_local_session_weight";
+ private static final int DEFAULT_LOCAL_SESSION_WEIGHT;
+
+ public static final String RENEW_SESSION_WEIGHT = "zookeeper.connection_throttle_renew_session_weight";
+ private static final int DEFAULT_RENEW_SESSION_WEIGHT;
+
+ // for unit tests only
+ protected static void setConnectionWeightEnabled(boolean enabled) {
+ connectionWeightEnabled = enabled;
+ logWeighedThrottlingSetting();
+ }
+
+ private static void logWeighedThrottlingSetting() {
+ if (connectionWeightEnabled) {
+ LOG.info("Weighed connection throttling is enabled. "
+ + "But it will only be effective if connection throttling is enabled");
+ LOG.info(
+ "The weights for different session types are: global {} renew {} local {}",
+ DEFAULT_GLOBAL_SESSION_WEIGHT,
+ DEFAULT_RENEW_SESSION_WEIGHT,
+ DEFAULT_LOCAL_SESSION_WEIGHT
+ );
+ } else {
+ LOG.info("Weighed connection throttling is disabled");
+ }
+ }
static {
- DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
- DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1);
- DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+ int tokens = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
+ int fillCount = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+
+ connectionWeightEnabled = Boolean.getBoolean(WEIGHED_CONNECTION_THROTTLE);
+
+ // if not specified, the weights for a global session, a local session, and a renew session
+ // are 3, 1, 2 respectively. The weight for a global session is 3 because in our connection benchmarking,
+ // the throughput of global sessions is about one third of that of local sessions. Renewing a session
+ // requires is more expensive than establishing a local session and cheaper than creating a global session so
+ // its default weight is set to 2.
+ int globalWeight = Integer.getInteger(GLOBAL_SESSION_WEIGHT, 3);
+ int localWeight = Integer.getInteger(LOCAL_SESSION_WEIGHT, 1);
+ int renewWeight = Integer.getInteger(RENEW_SESSION_WEIGHT, 2);
+
+ if (globalWeight <= 0) {
+ LOG.warn("Invalid global session weight {}. It should be larger than 0", globalWeight);
+ DEFAULT_GLOBAL_SESSION_WEIGHT = 3;
+ } else if (globalWeight < localWeight) {
+ LOG.warn("The global session weight {} is less than the local session weight {}. Use the local session weight.",
+ globalWeight, localWeight);
+ DEFAULT_GLOBAL_SESSION_WEIGHT = localWeight;
+ } else {
+ DEFAULT_GLOBAL_SESSION_WEIGHT = globalWeight;
+ }
+ if (localWeight <= 0) {
+ LOG.warn("Invalid local session weight {}. It should be larger than 0", localWeight);
+ DEFAULT_LOCAL_SESSION_WEIGHT = 1;
+ } else {
+ DEFAULT_LOCAL_SESSION_WEIGHT = localWeight;
+ }
+
+ if (renewWeight <= 0) {
+ LOG.warn("Invalid renew session weight {}. It should be larger than 0", renewWeight);
+ DEFAULT_RENEW_SESSION_WEIGHT = 2;
+ } else if (renewWeight < localWeight) {
+ LOG.warn("The renew session weight {} is less than the local session weight {}. Use the local session weight.",
+ renewWeight, localWeight);
+ DEFAULT_RENEW_SESSION_WEIGHT = localWeight;
+ } else {
+ DEFAULT_RENEW_SESSION_WEIGHT = renewWeight;
+ }
+
+ // This is based on the assumption that tokens set in config are for global sessions
+ DEFAULT_CONNECTION_THROTTLE_TOKENS = connectionWeightEnabled
+ ? DEFAULT_GLOBAL_SESSION_WEIGHT * tokens : tokens;
+ DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1);
+ DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = connectionWeightEnabled
+ ? DEFAULT_GLOBAL_SESSION_WEIGHT * fillCount : fillCount;
DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1);
DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02);
DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002);
DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0);
+
+ logWeighedThrottlingSetting();
}
/* Varation of Integer.getInteger for real number properties */
@@ -212,6 +295,22 @@ public class BlueThrottle {
return maxTokens - tokens;
}
+ public int getRequiredTokensForGlobal() {
+ return BlueThrottle.DEFAULT_GLOBAL_SESSION_WEIGHT;
+ }
+
+ public int getRequiredTokensForLocal() {
+ return BlueThrottle.DEFAULT_LOCAL_SESSION_WEIGHT;
+ }
+
+ public int getRequiredTokensForRenew() {
+ return BlueThrottle.DEFAULT_RENEW_SESSION_WEIGHT;
+ }
+
+ public boolean isConnectionWeightEnabled() {
+ return BlueThrottle.connectionWeightEnabled;
+ }
+
public synchronized boolean checkLimit(int need) {
// A maxTokens setting of zero disables throttling
if (maxTokens == 0) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
index 8a3bb1e..9cf4774 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
@@ -137,4 +137,5 @@ public interface SessionTracker {
*/
long getLocalSessionCount();
+ boolean isLocalSessionsEnabled();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
index 07b3fae..755512e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
@@ -342,4 +342,8 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements Sessi
return 0;
}
+ @Override
+ public boolean isLocalSessionsEnabled() {
+ return false;
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 01748e6..95aaed3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -148,6 +148,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected String initialConfig;
private final RequestPathMetricsCollector requestPathMetricsCollector;
+ private boolean localSessionEnabled = false;
protected enum State {
INITIAL,
RUNNING,
@@ -598,7 +599,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
registerMetrics();
setState(State.RUNNING);
+
requestPathMetricsCollector.start();
+
+ localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
notifyAll();
}
@@ -1212,12 +1216,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return connThrottle.getDropChance();
}
- public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException, ClientCnxnLimitException {
-
- if (!connThrottle.checkLimit(1)) {
- throw new ClientCnxnLimitException();
- }
- ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+ @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
+ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
+ throws IOException, ClientCnxnLimitException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
@@ -1226,7 +1227,27 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress()
+ " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen()));
}
+ long sessionId = connReq.getSessionId();
+ int tokensNeeded = 1;
+ if (connThrottle.isConnectionWeightEnabled()) {
+ if (sessionId == 0) {
+ if (localSessionEnabled) {
+ tokensNeeded = connThrottle.getRequiredTokensForLocal();
+ } else {
+ tokensNeeded = connThrottle.getRequiredTokensForGlobal();
+ }
+ } else {
+ tokensNeeded = connThrottle.getRequiredTokensForRenew();
+ }
+ }
+
+ if (!connThrottle.checkLimit(tokensNeeded)) {
+ throw new ClientCnxnLimitException();
+ }
+ ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
+
boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
@@ -1269,7 +1290,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
- long sessionId = connReq.getSessionId();
if (sessionId == 0) {
long id = createSession(cnxn, passwd, sessionTimeout);
if (LOG.isDebugEnabled()) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
index f4eb92c..5ab732f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
@@ -38,7 +38,6 @@ public class LeaderSessionTracker extends UpgradeableSessionTracker {
private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
- private final boolean localSessionsEnabled;
private final SessionTrackerImpl globalSessionTracker;
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
index 9edb4f2..bc25e5d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
@@ -55,6 +55,11 @@ public abstract class UpgradeableSessionTracker implements SessionTracker {
return localSessionTracker != null && localSessionTracker.isTrackingSession(sessionId);
}
+ @Override
+ public boolean isLocalSessionsEnabled() {
+ return localSessionsEnabled;
+ }
+
public boolean isUpgradingSession(long sessionId) {
return upgradingSessions != null && upgradingSessions.containsKey(sessionId);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
index c3d10bb..8b64c2b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
@@ -22,7 +22,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +35,7 @@ import org.slf4j.LoggerFactory;
public class BlueThrottleTest extends ZKTestCase {
private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
+ private static final int RAPID_TIMEOUT = 10000;
class MockRandom extends Random {
@@ -162,4 +168,132 @@ public class BlueThrottleTest extends ZKTestCase {
assertTrue("Later requests should have a chance", accepted > 0);
}
+ private QuorumUtil quorumUtil = new QuorumUtil(1);
+ private ClientBase.CountdownWatcher[] watchers;
+ private ZooKeeper[] zks;
+
+ private int connect(int n) throws Exception {
+ String connStr = quorumUtil.getConnectionStringForServer(1);
+ int connected = 0;
+
+ zks = new ZooKeeper[n];
+ watchers = new ClientBase.CountdownWatcher[n];
+ for (int i = 0; i < n; i++){
+ watchers[i] = new ClientBase.CountdownWatcher();
+ zks[i] = new ZooKeeper(connStr, 3000, watchers[i]);
+ try {
+ watchers[i].waitForConnected(RAPID_TIMEOUT);
+ connected++;
+ } catch (TimeoutException e) {
+ LOG.info("Connection denied by the throttler due to insufficient tokens");
+ break;
+ }
+ }
+
+ return connected;
+ }
+
+ private void shutdownQuorum() throws Exception{
+ for (ZooKeeper zk : zks) {
+ if (zk != null) {
+ zk.close();
+ }
+ }
+
+ quorumUtil.shutdownAll();
+ }
+
+ @Test
+ public void testNoThrottling() throws Exception {
+ quorumUtil.startAll();
+
+ //disable throttling
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(0);
+
+ int connected = connect(10);
+
+ Assert.assertEquals(10, connected);
+ shutdownQuorum();
+ }
+
+ @Test
+ public void testThrottling() throws Exception {
+ quorumUtil.enableLocalSession(true);
+ quorumUtil.startAll();
+
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
+ //no refill, makes testing easier
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+
+ int connected = connect(3);
+ Assert.assertEquals(2, connected);
+ shutdownQuorum();
+
+ quorumUtil.enableLocalSession(false);
+ quorumUtil.startAll();
+
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
+ //no refill, makes testing easier
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+
+ connected = connect(3);
+ Assert.assertEquals(2, connected);
+ shutdownQuorum();
+ }
+
+ @Test
+ public void testWeighedThrottling() throws Exception {
+ // this test depends on the session weights set to the default values
+ // 3 for global session, 2 for renew sessions, 1 for local sessions
+ BlueThrottle.setConnectionWeightEnabled(true);
+
+ quorumUtil.enableLocalSession(true);
+ quorumUtil.startAll();
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+ //try to create 11 local sessions, 10 created, because we have only 10 tokens
+ int connected = connect(11);
+ Assert.assertEquals(10, connected);
+ shutdownQuorum();
+
+ quorumUtil.enableLocalSession(false);
+ quorumUtil.startAll();
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+ //tyr to create 11 global sessions, 3 created, because we have 10 tokens and each connection needs 3
+ connected = connect(11);
+ Assert.assertEquals(3, connected);
+ shutdownQuorum();
+
+ quorumUtil.startAll();
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+ connected = connect(2);
+ Assert.assertEquals(2, connected);
+
+ quorumUtil.shutdown(1);
+ watchers[0].waitForDisconnected(RAPID_TIMEOUT);
+ watchers[1].waitForDisconnected(RAPID_TIMEOUT);
+
+ quorumUtil.restart(1);
+ //client will try to reconnect
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(3);
+ quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+ int reconnected = 0;
+ for (int i = 0; i < 2; i++){
+ try {
+ watchers[i].waitForConnected(RAPID_TIMEOUT);
+ reconnected++;
+ } catch (TimeoutException e) {
+ LOG.info("One reconnect fails due to insufficient tokens");
+ }
+ }
+ //each reconnect takes two tokens, we have 3, so only one reconnects
+ LOG.info("reconnected {}", reconnected);
+ Assert.assertEquals(1, reconnected);
+ shutdownQuorum();
+ }
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 8aacaac..9724423 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -278,6 +278,10 @@ public class PrepRequestProcessorTest extends ClientBase {
return 0;
}
+ @Override
+ public boolean isLocalSessionsEnabled() {
+ return false;
+ }
}
}