You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/07/27 03:16:07 UTC
zookeeper git commit: ZOOKEEPER-2251: Add Client side packet response
timeout to avoid infinite wait.
Repository: zookeeper
Updated Branches:
refs/heads/master 75c652f45 -> 9f8279841
ZOOKEEPER-2251: Add Client side packet response timeout to avoid infinite wait.
Add Client side packet response timeout to avoid infinite wait.
Author: Mohammad Arshad <ar...@apache.org>
Reviewers: Michael Han <ha...@apache.org>, nrico Olivelli <eo...@gmail.com>
Closes #119 from arshadmohammad/ZOOKEEPER-2251
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/9f827984
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/9f827984
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/9f827984
Branch: refs/heads/master
Commit: 9f82798415351a20136ceb1640b1781723e51cc1
Parents: 75c652f
Author: Mohammad Arshad <ar...@apache.org>
Authored: Thu Jul 26 20:16:04 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Thu Jul 26 20:16:04 2018 -0700
----------------------------------------------------------------------
.../main/org/apache/zookeeper/ClientCnxn.java | 78 +++++++--
.../org/apache/zookeeper/KeeperException.java | 13 ++
.../main/org/apache/zookeeper/ZooKeeper.java | 11 +-
.../apache/zookeeper/client/ZKClientConfig.java | 37 +++++
.../zookeeper/ClientRequestTimeoutTest.java | 165 +++++++++++++++++++
.../content/xdocs/zookeeperProgrammers.xml | 20 +++
6 files changed, 311 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/ClientCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java
index ba601bc..b28c980 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxn.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java
@@ -201,6 +201,11 @@ public class ClientCnxn {
public ZooKeeperSaslClient zooKeeperSaslClient;
private final ZKClientConfig clientConfig;
+ /**
+ * If any request's response in not received in configured requestTimeout
+ * then it is assumed that the response packet is lost.
+ */
+ private long requestTimeout;
public long getSessionId() {
return sessionId;
@@ -395,6 +400,7 @@ public class ClientCnxn {
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
+ initRequestTimeout();
}
public void start() {
@@ -671,7 +677,8 @@ public class ClientCnxn {
}
}
- private void finishPacket(Packet p) {
+ // @VisibleForTesting
+ protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
@@ -1246,15 +1253,7 @@ public class ClientCnxn {
}
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
- cleanup();
- if (state.isAlive()) {
- eventThread.queueEvent(new WatchedEvent(
- Event.EventType.None,
- Event.KeeperState.Disconnected,
- null));
- }
- clientCnxnSocket.updateNow();
- clientCnxnSocket.updateLastSendAndHeard();
+ cleanAndNotifyState();
}
}
}
@@ -1275,6 +1274,16 @@ public class ClientCnxn {
+ Long.toHexString(getSessionId()));
}
+ private void cleanAndNotifyState() {
+ cleanup();
+ if (state.isAlive()) {
+ eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+ Event.KeeperState.Disconnected, null));
+ }
+ clientCnxnSocket.updateNow();
+ clientCnxnSocket.updateLastSendAndHeard();
+ }
+
private void pingRwServer() throws RWServerFoundException {
String result = null;
InetSocketAddress addr = hostProvider.next(0);
@@ -1506,13 +1515,40 @@ public class ClientCnxn {
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
- while (!packet.finished) {
- packet.wait();
+ if (requestTimeout > 0) {
+ // Wait for request completion with timeout
+ waitForPacketFinish(r, packet);
+ } else {
+ // Wait for request completion infinitely
+ while (!packet.finished) {
+ packet.wait();
+ }
}
}
+ if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
+ sendThread.cleanAndNotifyState();
+ }
return r;
}
+ /**
+ * Wait for request completion with timeout.
+ */
+ private void waitForPacketFinish(ReplyHeader r, Packet packet)
+ throws InterruptedException {
+ long waitStartTime = Time.currentElapsedTime();
+ while (!packet.finished) {
+ packet.wait(requestTimeout);
+ if (!packet.finished && ((Time.currentElapsedTime()
+ - waitStartTime) >= requestTimeout)) {
+ LOG.error("Timeout error occurred for the packet '{}'.",
+ packet);
+ r.setErr(Code.REQUESTTIMEOUT.intValue());
+ break;
+ }
+ }
+ }
+
public void saslCompleted() {
sendThread.getClientCnxnSocket().saslCompleted();
}
@@ -1603,4 +1639,22 @@ public class ClientCnxn {
this.ctx = ctx;
}
}
+
+ private void initRequestTimeout() {
+ try {
+ requestTimeout = clientConfig.getLong(
+ ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
+ ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
+ LOG.info("{} value is {}. feature enabled=",
+ ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
+ requestTimeout, requestTimeout > 0);
+ } catch (NumberFormatException e) {
+ LOG.error(
+ "Configured value {} for property {} can not be parsed to long.",
+ clientConfig.getProperty(
+ ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
+ ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
+ throw e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/KeeperException.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/KeeperException.java b/src/java/main/org/apache/zookeeper/KeeperException.java
index 143fac5..f797bb0 100644
--- a/src/java/main/org/apache/zookeeper/KeeperException.java
+++ b/src/java/main/org/apache/zookeeper/KeeperException.java
@@ -144,6 +144,8 @@ public abstract class KeeperException extends Exception {
return new NoWatcherException();
case RECONFIGDISABLED:
return new ReconfigDisabledException();
+ case REQUESTTIMEOUT:
+ return new RequestTimeoutException();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code");
@@ -392,6 +394,8 @@ public abstract class KeeperException extends Exception {
EPHEMERALONLOCALSESSION (EphemeralOnLocalSession),
/** Attempts to remove a non-existing watcher */
NOWATCHER (-121),
+ /** Request not completed within max allowed time.*/
+ REQUESTTIMEOUT (-122),
/** Attempts to perform a reconfiguration operation when reconfiguration feature is disabled. */
RECONFIGDISABLED(-123);
@@ -843,4 +847,13 @@ public abstract class KeeperException extends Exception {
super(Code.RECONFIGDISABLED, path);
}
}
+
+ /**
+ * @see Code#REQUESTTIMEOUT
+ */
+ public static class RequestTimeoutException extends KeeperException {
+ public RequestTimeoutException() {
+ super(Code.REQUESTTIMEOUT);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/ZooKeeper.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java
index ebcc500..6cac98e 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeper.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java
@@ -876,12 +876,21 @@ public class ZooKeeper implements AutoCloseable {
connectString);
hostProvider = aHostProvider;
- cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
+ cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
+ // @VisibleForTesting
+ protected ClientCnxn createConnection(String chrootPath,
+ HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+ boolean canBeReadOnly) throws IOException {
+ return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
+ watchManager, clientCnxnSocket, canBeReadOnly);
+ }
+
/**
* To create a ZooKeeper client object, the application needs to pass a
* connection string containing a comma separated list of host:port pairs,
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
index 3c10627..097f2f0 100644
--- a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
+++ b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
@@ -56,9 +56,15 @@ public class ZKClientConfig extends ZKConfig {
@SuppressWarnings("deprecation")
public static final String SECURE_CLIENT = ZooKeeper.SECURE_CLIENT;
public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 4096 * 1024; /* 4 MB */
+ public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
+ /**
+ * Feature is disabled by default.
+ */
+ public static final long ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT = 0;
public ZKClientConfig() {
super();
+ initFromJavaSystemProperties();
}
public ZKClientConfig(File configFile) throws ConfigException {
@@ -69,6 +75,15 @@ public class ZKClientConfig extends ZKConfig {
super(configPath);
}
+ /**
+ * Initialize all the ZooKeeper client properties which are configurable as
+ * java system property
+ */
+ private void initFromJavaSystemProperties() {
+ setProperty(ZOOKEEPER_REQUEST_TIMEOUT,
+ System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
+ }
+
@Override
protected void handleBackwardCompatibility() {
/**
@@ -100,4 +115,26 @@ public class ZKClientConfig extends ZKConfig {
public boolean isSaslClientEnabled() {
return Boolean.valueOf(getProperty(ENABLE_CLIENT_SASL_KEY, ENABLE_CLIENT_SASL_DEFAULT));
}
+
+ /**
+ * Get the value of the <code>key</code> property as an <code>long</code>.
+ * If property is not set, the provided <code>defaultValue</code> is
+ * returned
+ *
+ * @param key
+ * property key.
+ * @param defaultValue
+ * default value.
+ * @throws NumberFormatException
+ * when the value is invalid
+ * @return return property value as an <code>long</code>, or
+ * <code>defaultValue</code>
+ */
+ public long getLong(String key, long defaultValue) {
+ String value = getProperty(key);
+ if (value != null) {
+ return Long.parseLong(value.trim());
+ }
+ return defaultValue;
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java b/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java
new file mode 100644
index 0000000..4f5548d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
+ private static final int SERVER_COUNT = 3;
+ private boolean dropPacket = false;
+ private int dropPacketType = ZooDefs.OpCode.create;
+
+ @Test(timeout = 120000)
+ public void testClientRequestTimeout() throws Exception {
+ int requestTimeOut = 15000;
+ System.setProperty("zookeeper.request.timeout",
+ Integer.toString(requestTimeOut));
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+ + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+ false);
+ mt[i].start();
+ }
+
+ // ensure server started
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT));
+ }
+
+ CountdownWatcher watch1 = new CountdownWatcher();
+ CustomZooKeeper zk = new CustomZooKeeper(getCxnString(clientPorts),
+ ClientBase.CONNECTION_TIMEOUT, watch1);
+ watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+ String data = "originalData";
+ // lets see one successful operation
+ zk.create("/clientHang1", data.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+
+ // now make environment for client hang
+ dropPacket = true;
+ dropPacketType = ZooDefs.OpCode.create;
+
+ // Test synchronous API
+ try {
+ zk.create("/clientHang2", data.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ fail("KeeperException is expected.");
+ } catch (KeeperException exception) {
+ assertEquals(KeeperException.Code.REQUESTTIMEOUT.intValue(),
+ exception.code().intValue());
+ }
+ // reset the error behavior
+ dropPacket = false;
+ watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+ String path = "/clientHang3";
+ String create = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ assertEquals(path, create);
+
+ // do cleanup
+ zk.close();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i].shutdown();
+ }
+ }
+
+ /**
+ * @return connection string in the form of
+ * 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
+ */
+ private String getCxnString(int[] clientPorts) {
+ StringBuffer hostPortBuffer = new StringBuffer();
+ for (int i = 0; i < clientPorts.length; i++) {
+ hostPortBuffer.append("127.0.0.1:");
+ hostPortBuffer.append(clientPorts[i]);
+ if (i != (clientPorts.length - 1)) {
+ hostPortBuffer.append(',');
+ }
+ }
+ return hostPortBuffer.toString();
+ }
+
+ class CustomClientCnxn extends ClientCnxn {
+
+ public CustomClientCnxn(String chrootPath, HostProvider hostProvider,
+ int sessionTimeout, ZooKeeper zooKeeper,
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+ boolean canBeReadOnly) throws IOException {
+ super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
+ clientCnxnSocket, canBeReadOnly);
+ }
+
+ @Override
+ public void finishPacket(Packet p) {
+ if (dropPacket && p.requestHeader.getType() == dropPacketType) {
+ // do nothing, just return, it is the same as packet is dropped
+ // by the network
+ return;
+ }
+ super.finishPacket(p);
+ }
+ }
+
+ class CustomZooKeeper extends ZooKeeper {
+ public CustomZooKeeper(String connectString, int sessionTimeout,
+ Watcher watcher) throws IOException {
+ super(connectString, sessionTimeout, watcher);
+ }
+
+ @Override
+ protected ClientCnxn createConnection(String chrootPath,
+ HostProvider hostProvider, int sessionTimeout,
+ ZooKeeper zooKeeper, ClientWatchManager watcher,
+ ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
+ throws IOException {
+ return new CustomClientCnxn(chrootPath, hostProvider,
+ sessionTimeout, zooKeeper, watcher, clientCnxnSocket,
+ canBeReadOnly);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml b/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
index a2a978f..cca60b3 100644
--- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
+++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
@@ -1553,6 +1553,26 @@ public abstract class ServerAuthenticationProvider implements AuthenticationProv
<para>Specifies path to kinit binary. Default is "/usr/bin/kinit".</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term>zookeeper.request.timeout</term>
+ <listitem>
+ <para>
+ <emphasis role="bold">New in 3.6.0,3.5.5:</emphasis>
+ If ZooKeeper server is not responding or if there is a delay in the
+ network, ZooKeeper client java sync API waits infinitely for the
+ response. To avoid this situation configure
+ zookeeper.request.timeout. By default this feature is disabled and
+ default value is 0. To enable this feature configure a positive
+ integer value. for example to set value to 30 second configure
+ zookeeper.request.timeout=30000.
+ </para>
+ <para>
+ If response is not received within configured zookeeper.request.timeout
+ then outgoing and pending requests are cancelled with
+ org.apache.zookeeper.KeeperException.ConnectionLossException.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</section>
</section>