You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/07/27 03:16:07 UTC

zookeeper git commit: ZOOKEEPER-2251: Add Client side packet response timeout to avoid infinite wait.

Repository: zookeeper
Updated Branches:
  refs/heads/master 75c652f45 -> 9f8279841


ZOOKEEPER-2251: Add Client side packet response timeout to avoid infinite wait.

Add Client side packet response timeout to avoid infinite wait.

Author: Mohammad Arshad <ar...@apache.org>

Reviewers: Michael Han <ha...@apache.org>, nrico Olivelli <eo...@gmail.com>

Closes #119 from arshadmohammad/ZOOKEEPER-2251


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/9f827984
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/9f827984
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/9f827984

Branch: refs/heads/master
Commit: 9f82798415351a20136ceb1640b1781723e51cc1
Parents: 75c652f
Author: Mohammad Arshad <ar...@apache.org>
Authored: Thu Jul 26 20:16:04 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Thu Jul 26 20:16:04 2018 -0700

----------------------------------------------------------------------
 .../main/org/apache/zookeeper/ClientCnxn.java   |  78 +++++++--
 .../org/apache/zookeeper/KeeperException.java   |  13 ++
 .../main/org/apache/zookeeper/ZooKeeper.java    |  11 +-
 .../apache/zookeeper/client/ZKClientConfig.java |  37 +++++
 .../zookeeper/ClientRequestTimeoutTest.java     | 165 +++++++++++++++++++
 .../content/xdocs/zookeeperProgrammers.xml      |  20 +++
 6 files changed, 311 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/ClientCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java
index ba601bc..b28c980 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxn.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java
@@ -201,6 +201,11 @@ public class ClientCnxn {
     public ZooKeeperSaslClient zooKeeperSaslClient;
 
     private final ZKClientConfig clientConfig;
+    /**
+     * If any request's response in not received in configured requestTimeout
+     * then it is assumed that the response packet is lost.
+     */
+    private long requestTimeout;
 
     public long getSessionId() {
         return sessionId;
@@ -395,6 +400,7 @@ public class ClientCnxn {
         sendThread = new SendThread(clientCnxnSocket);
         eventThread = new EventThread();
         this.clientConfig=zooKeeper.getClientConfig();
+        initRequestTimeout();
     }
 
     public void start() {
@@ -671,7 +677,8 @@ public class ClientCnxn {
        }
     }
 
-    private void finishPacket(Packet p) {
+    // @VisibleForTesting
+    protected void finishPacket(Packet p) {
         int err = p.replyHeader.getErr();
         if (p.watchRegistration != null) {
             p.watchRegistration.register(err);
@@ -1246,15 +1253,7 @@ public class ClientCnxn {
                         }
                         // At this point, there might still be new packets appended to outgoingQueue.
                         // they will be handled in next connection or cleared up if closed.
-                        cleanup();
-                        if (state.isAlive()) {
-                            eventThread.queueEvent(new WatchedEvent(
-                                    Event.EventType.None,
-                                    Event.KeeperState.Disconnected,
-                                    null));
-                        }
-                        clientCnxnSocket.updateNow();
-                        clientCnxnSocket.updateLastSendAndHeard();
+                        cleanAndNotifyState();
                     }
                 }
             }
@@ -1275,6 +1274,16 @@ public class ClientCnxn {
                            + Long.toHexString(getSessionId()));
         }
 
+        private void cleanAndNotifyState() {
+            cleanup();
+            if (state.isAlive()) {
+                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+                        Event.KeeperState.Disconnected, null));
+            }
+            clientCnxnSocket.updateNow();
+            clientCnxnSocket.updateLastSendAndHeard();
+        }
+
         private void pingRwServer() throws RWServerFoundException {
             String result = null;
             InetSocketAddress addr = hostProvider.next(0);
@@ -1506,13 +1515,40 @@ public class ClientCnxn {
         Packet packet = queuePacket(h, r, request, response, null, null, null,
                 null, watchRegistration, watchDeregistration);
         synchronized (packet) {
-            while (!packet.finished) {
-                packet.wait();
+            if (requestTimeout > 0) {
+                // Wait for request completion with timeout
+                waitForPacketFinish(r, packet);
+            } else {
+                // Wait for request completion infinitely
+                while (!packet.finished) {
+                    packet.wait();
+                }
             }
         }
+        if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
+            sendThread.cleanAndNotifyState();
+        }
         return r;
     }
 
+    /**
+     * Wait for request completion with timeout.
+     */
+    private void waitForPacketFinish(ReplyHeader r, Packet packet)
+            throws InterruptedException {
+        long waitStartTime = Time.currentElapsedTime();
+        while (!packet.finished) {
+            packet.wait(requestTimeout);
+            if (!packet.finished && ((Time.currentElapsedTime()
+                    - waitStartTime) >= requestTimeout)) {
+                LOG.error("Timeout error occurred for the packet '{}'.",
+                        packet);
+                r.setErr(Code.REQUESTTIMEOUT.intValue());
+                break;
+            }
+        }
+    }
+
     public void saslCompleted() {
         sendThread.getClientCnxnSocket().saslCompleted();
     }
@@ -1603,4 +1639,22 @@ public class ClientCnxn {
             this.ctx = ctx;
         }
     }
+
+    private void initRequestTimeout() {
+        try {
+            requestTimeout = clientConfig.getLong(
+                    ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
+                    ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
+            LOG.info("{} value is {}. feature enabled=",
+                    ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
+                    requestTimeout, requestTimeout > 0);
+        } catch (NumberFormatException e) {
+            LOG.error(
+                    "Configured value {} for property {} can not be parsed to long.",
+                    clientConfig.getProperty(
+                            ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
+                    ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
+            throw e;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/KeeperException.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/KeeperException.java b/src/java/main/org/apache/zookeeper/KeeperException.java
index 143fac5..f797bb0 100644
--- a/src/java/main/org/apache/zookeeper/KeeperException.java
+++ b/src/java/main/org/apache/zookeeper/KeeperException.java
@@ -144,6 +144,8 @@ public abstract class KeeperException extends Exception {
                 return new NoWatcherException();
             case RECONFIGDISABLED:
                 return new ReconfigDisabledException();
+            case REQUESTTIMEOUT:
+                return new RequestTimeoutException();
             case OK:
             default:
                 throw new IllegalArgumentException("Invalid exception code");
@@ -392,6 +394,8 @@ public abstract class KeeperException extends Exception {
         EPHEMERALONLOCALSESSION (EphemeralOnLocalSession),
         /** Attempts to remove a non-existing watcher */
         NOWATCHER (-121),
+        /** Request not completed within max allowed time.*/
+        REQUESTTIMEOUT (-122),
         /** Attempts to perform a reconfiguration operation when reconfiguration feature is disabled. */
         RECONFIGDISABLED(-123);
 
@@ -843,4 +847,13 @@ public abstract class KeeperException extends Exception {
             super(Code.RECONFIGDISABLED, path);
         }
     }
+
+    /**
+     * @see Code#REQUESTTIMEOUT
+     */
+    public static class RequestTimeoutException extends KeeperException {
+        public RequestTimeoutException() {
+            super(Code.REQUESTTIMEOUT);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/ZooKeeper.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java
index ebcc500..6cac98e 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeper.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java
@@ -876,12 +876,21 @@ public class ZooKeeper implements AutoCloseable {
                 connectString);
         hostProvider = aHostProvider;
 
-        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
+        cnxn = createConnection(connectStringParser.getChrootPath(),
                 hostProvider, sessionTimeout, this, watchManager,
                 getClientCnxnSocket(), canBeReadOnly);
         cnxn.start();
     }
 
+    // @VisibleForTesting
+    protected ClientCnxn createConnection(String chrootPath,
+            HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
+            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+            boolean canBeReadOnly) throws IOException {
+        return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
+                watchManager, clientCnxnSocket, canBeReadOnly);
+    }
+
     /**
      * To create a ZooKeeper client object, the application needs to pass a
      * connection string containing a comma separated list of host:port pairs,

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
index 3c10627..097f2f0 100644
--- a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
+++ b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
@@ -56,9 +56,15 @@ public class ZKClientConfig extends ZKConfig {
     @SuppressWarnings("deprecation")
     public static final String SECURE_CLIENT = ZooKeeper.SECURE_CLIENT;
     public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 4096 * 1024; /* 4 MB */
+    public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout";
+    /**
+     * Feature is disabled by default.
+     */
+    public static final long ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT = 0;
 
     public ZKClientConfig() {
         super();
+        initFromJavaSystemProperties();
     }
 
     public ZKClientConfig(File configFile) throws ConfigException {
@@ -69,6 +75,15 @@ public class ZKClientConfig extends ZKConfig {
         super(configPath);
     }
 
+    /**
+     * Initialize all the ZooKeeper client properties which are configurable as
+     * java system property
+     */
+    private void initFromJavaSystemProperties() {
+        setProperty(ZOOKEEPER_REQUEST_TIMEOUT,
+                System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
+    }
+
     @Override
     protected void handleBackwardCompatibility() {
         /**
@@ -100,4 +115,26 @@ public class ZKClientConfig extends ZKConfig {
     public boolean isSaslClientEnabled() {
         return Boolean.valueOf(getProperty(ENABLE_CLIENT_SASL_KEY, ENABLE_CLIENT_SASL_DEFAULT));
     }
+
+    /**
+     * Get the value of the <code>key</code> property as an <code>long</code>.
+     * If property is not set, the provided <code>defaultValue</code> is
+     * returned
+     *
+     * @param key
+     *            property key.
+     * @param defaultValue
+     *            default value.
+     * @throws NumberFormatException
+     *             when the value is invalid
+     * @return return property value as an <code>long</code>, or
+     *         <code>defaultValue</code>
+     */
+    public long getLong(String key, long defaultValue) {
+        String value = getProperty(key);
+        if (value != null) {
+            return Long.parseLong(value.trim());
+        }
+        return defaultValue;
+    }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java b/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java
new file mode 100644
index 0000000..4f5548d
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
+    private static final int SERVER_COUNT = 3;
+    private boolean dropPacket = false;
+    private int dropPacketType = ZooDefs.OpCode.create;
+
+    @Test(timeout = 120000)
+    public void testClientRequestTimeout() throws Exception {
+        int requestTimeOut = 15000;
+        System.setProperty("zookeeper.request.timeout",
+                Integer.toString(requestTimeOut));
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+                    + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+                    false);
+            mt[i].start();
+        }
+
+        // ensure server started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT));
+        }
+
+        CountdownWatcher watch1 = new CountdownWatcher();
+        CustomZooKeeper zk = new CustomZooKeeper(getCxnString(clientPorts),
+                ClientBase.CONNECTION_TIMEOUT, watch1);
+        watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+        String data = "originalData";
+        // lets see one successful operation
+        zk.create("/clientHang1", data.getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT_SEQUENTIAL);
+
+        // now make environment for client hang
+        dropPacket = true;
+        dropPacketType = ZooDefs.OpCode.create;
+
+        // Test synchronous API
+        try {
+            zk.create("/clientHang2", data.getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            fail("KeeperException is expected.");
+        } catch (KeeperException exception) {
+            assertEquals(KeeperException.Code.REQUESTTIMEOUT.intValue(),
+                    exception.code().intValue());
+        }
+        // reset the error behavior
+        dropPacket = false;
+        watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+        String path = "/clientHang3";
+        String create = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        assertEquals(path, create);
+
+        // do cleanup
+        zk.close();
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+    }
+
+    /**
+     * @return connection string in the form of
+     *         127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
+     */
+    private String getCxnString(int[] clientPorts) {
+        StringBuffer hostPortBuffer = new StringBuffer();
+        for (int i = 0; i < clientPorts.length; i++) {
+            hostPortBuffer.append("127.0.0.1:");
+            hostPortBuffer.append(clientPorts[i]);
+            if (i != (clientPorts.length - 1)) {
+                hostPortBuffer.append(',');
+            }
+        }
+        return hostPortBuffer.toString();
+    }
+
+    class CustomClientCnxn extends ClientCnxn {
+
+        public CustomClientCnxn(String chrootPath, HostProvider hostProvider,
+                int sessionTimeout, ZooKeeper zooKeeper,
+                ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
+                boolean canBeReadOnly) throws IOException {
+            super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
+                    clientCnxnSocket, canBeReadOnly);
+        }
+
+        @Override
+        public void finishPacket(Packet p) {
+            if (dropPacket && p.requestHeader.getType() == dropPacketType) {
+                // do nothing, just return, it is the same as packet is dropped
+                // by the network
+                return;
+            }
+            super.finishPacket(p);
+        }
+    }
+
+    class CustomZooKeeper extends ZooKeeper {
+        public CustomZooKeeper(String connectString, int sessionTimeout,
+                Watcher watcher) throws IOException {
+            super(connectString, sessionTimeout, watcher);
+        }
+
+        @Override
+        protected ClientCnxn createConnection(String chrootPath,
+                HostProvider hostProvider, int sessionTimeout,
+                ZooKeeper zooKeeper, ClientWatchManager watcher,
+                ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
+                        throws IOException {
+            return new CustomClientCnxn(chrootPath, hostProvider,
+                    sessionTimeout, zooKeeper, watcher, clientCnxnSocket,
+                    canBeReadOnly);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
----------------------------------------------------------------------
diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml b/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
index a2a978f..cca60b3 100644
--- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
+++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml
@@ -1553,6 +1553,26 @@ public abstract class ServerAuthenticationProvider implements AuthenticationProv
                     <para>Specifies path to kinit binary. Default is "/usr/bin/kinit".</para>
                 </listitem>
             </varlistentry>
+            <varlistentry>
+                <term>zookeeper.request.timeout</term>
+                <listitem>
+                <para>
+                  <emphasis role="bold">New in 3.6.0,3.5.5:</emphasis>
+                  If ZooKeeper server is not responding or if there is a delay in the
+                  network, ZooKeeper client java sync API waits infinitely for the
+                  response. To avoid this situation configure
+                  zookeeper.request.timeout. By default this feature is disabled and
+                  default value is 0. To enable this feature configure a positive
+                  integer value. for example to set value to 30 second configure
+                  zookeeper.request.timeout=30000.
+                </para>
+                <para>
+                    If response is not received within configured zookeeper.request.timeout
+                    then outgoing and pending requests are cancelled with
+                    org.apache.zookeeper.KeeperException.ConnectionLossException.
+                </para>
+                </listitem>
+            </varlistentry>
         </variablelist>
     </section>
     </section>