You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/02/08 02:59:21 UTC

[incubator-pulsar] branch master updated: Add connection timeout for binary lookup request (#1184)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7e2e7c  Add connection timeout for binary lookup request (#1184)
b7e2e7c is described below

commit b7e2e7cab6bbac3f8670a333547abdf77fa5d24c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Feb 7 18:59:19 2018 -0800

    Add connection timeout for binary lookup request (#1184)
    
    * Add connection timeout for binary lookup request
    
    * operation timeout
    
    * fix test
---
 .../client/impl/BrokerClientIntegrationTest.java   |  2 -
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 21 +++++++
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 64 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6affa58..342e232 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -583,7 +583,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
             final int concurrentLookupRequests = 20;
             ClientConfiguration clientConf = new ClientConfiguration();
             clientConf.setMaxNumberOfRejectedRequestPerConnection(0);
-            clientConf.setOperationTimeout(1, TimeUnit.MILLISECONDS);
             clientConf.setStatsInterval(0, TimeUnit.SECONDS);
             stopBroker();
             pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(1);
@@ -595,7 +594,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
             clientConf2.setStatsInterval(0, TimeUnit.SECONDS);
             clientConf2.setIoThreads(concurrentLookupRequests);
             clientConf2.setConnectionsPerBroker(20);
-            clientConf2.setOperationTimeout(1, TimeUnit.MILLISECONDS);
             pulsarClient2 = (PulsarClientImpl) PulsarClient.create(lookupUrl, clientConf2);
 
             ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 5ad4c66..3f2d176 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -60,6 +60,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.unix.Errors.NativeIoException;
 import io.netty.util.concurrent.Promise;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 
 public class ClientCnx extends PulsarHandler {
 
@@ -83,6 +84,7 @@ public class ClientCnx extends PulsarHandler {
     private volatile int numberOfRejectRequests = 0;
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
+    private final long operationTimeoutMs;
 
     private String proxyToTargetBrokerAddress = null;
 
@@ -96,6 +98,7 @@ public class ClientCnx extends PulsarHandler {
         this.authentication = conf.getAuthentication();
         this.eventLoopGroup = eventLoopGroup;
         this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
+        this.operationTimeoutMs = conf.getOperationTimeoutMs();
         this.state = State.None;
     }
 
@@ -268,6 +271,12 @@ public class ClientCnx extends PulsarHandler {
         CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);
 
         if (requestFuture != null) {
+            if (requestFuture.isCompletedExceptionally()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId());
+                }
+                return;
+            }
             // Complete future with exception if : Result.response=fail/null
             if (!lookupResult.hasResponse()
                     || CommandLookupTopicResponse.LookupType.Failed.equals(lookupResult.getResponse())) {
@@ -297,6 +306,12 @@ public class ClientCnx extends PulsarHandler {
         CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);
 
         if (requestFuture != null) {
+            if (requestFuture.isCompletedExceptionally()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId());
+                }
+                return;
+            }
             // Complete future with exception if : Result.response=fail/null
             if (!lookupResult.hasResponse()
                     || CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals(lookupResult.getResponse())) {
@@ -332,6 +347,12 @@ public class ClientCnx extends PulsarHandler {
     private boolean addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
         if (pendingLookupRequestSemaphore.tryAcquire()) {
             pendingLookupRequests.put(requestId, future);
+            eventLoopGroup.schedule(() -> {
+                if (!future.isDone()) {
+                    future.completeExceptionally(new TimeoutException(
+                            requestId + " lookup request timedout after ms " + operationTimeoutMs));
+                }
+            }, operationTimeoutMs, TimeUnit.MILLISECONDS);
             return true;
         }
         return false;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
new file mode 100644
index 0000000..a73342b
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.PulsarHandler;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+public class ClientCnxTest {
+
+    @Test
+    public void testClientCnxTimeout() throws Exception {
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setOperationTimeout(10, TimeUnit.MILLISECONDS);
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        when(listenerFuture.addListener(anyObject())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(anyObject())).thenReturn(listenerFuture);
+
+        Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
+        ctxField.setAccessible(true);
+        ctxField.set(cnx, ctx);
+        try {
+            cnx.newLookup(null, 123).get();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
rdhabalia@apache.org.