You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/02/08 02:59:21 UTC
[incubator-pulsar] branch master updated: Add connection timeout
for binary lookup request (#1184)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b7e2e7c Add connection timeout for binary lookup request (#1184)
b7e2e7c is described below
commit b7e2e7cab6bbac3f8670a333547abdf77fa5d24c
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Feb 7 18:59:19 2018 -0800
Add connection timeout for binary lookup request (#1184)
* Add connection timeout for binary lookup request
* operation timeout
* fix test
---
.../client/impl/BrokerClientIntegrationTest.java | 2 -
.../org/apache/pulsar/client/impl/ClientCnx.java | 21 +++++++
.../apache/pulsar/client/impl/ClientCnxTest.java | 64 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 6affa58..342e232 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -583,7 +583,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
final int concurrentLookupRequests = 20;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMaxNumberOfRejectedRequestPerConnection(0);
- clientConf.setOperationTimeout(1, TimeUnit.MILLISECONDS);
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
stopBroker();
pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(1);
@@ -595,7 +594,6 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
clientConf2.setStatsInterval(0, TimeUnit.SECONDS);
clientConf2.setIoThreads(concurrentLookupRequests);
clientConf2.setConnectionsPerBroker(20);
- clientConf2.setOperationTimeout(1, TimeUnit.MILLISECONDS);
pulsarClient2 = (PulsarClientImpl) PulsarClient.create(lookupUrl, clientConf2);
ProducerImpl producer = (ProducerImpl) pulsarClient.createProducer(topicName);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 5ad4c66..3f2d176 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -60,6 +60,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.util.concurrent.Promise;
+import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
public class ClientCnx extends PulsarHandler {
@@ -83,6 +84,7 @@ public class ClientCnx extends PulsarHandler {
private volatile int numberOfRejectRequests = 0;
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
+ private final long operationTimeoutMs;
private String proxyToTargetBrokerAddress = null;
@@ -96,6 +98,7 @@ public class ClientCnx extends PulsarHandler {
this.authentication = conf.getAuthentication();
this.eventLoopGroup = eventLoopGroup;
this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
+ this.operationTimeoutMs = conf.getOperationTimeoutMs();
this.state = State.None;
}
@@ -268,6 +271,12 @@ public class ClientCnx extends PulsarHandler {
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);
if (requestFuture != null) {
+ if (requestFuture.isCompletedExceptionally()) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId());
+ }
+ return;
+ }
// Complete future with exception if : Result.response=fail/null
if (!lookupResult.hasResponse()
|| CommandLookupTopicResponse.LookupType.Failed.equals(lookupResult.getResponse())) {
@@ -297,6 +306,12 @@ public class ClientCnx extends PulsarHandler {
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);
if (requestFuture != null) {
+ if (requestFuture.isCompletedExceptionally()) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} Request {} already timed-out", ctx.channel(), lookupResult.getRequestId());
+ }
+ return;
+ }
// Complete future with exception if : Result.response=fail/null
if (!lookupResult.hasResponse()
|| CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals(lookupResult.getResponse())) {
@@ -332,6 +347,12 @@ public class ClientCnx extends PulsarHandler {
private boolean addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
if (pendingLookupRequestSemaphore.tryAcquire()) {
pendingLookupRequests.put(requestId, future);
+ eventLoopGroup.schedule(() -> {
+ if (!future.isDone()) {
+ future.completeExceptionally(new TimeoutException(
+ requestId + " lookup request timedout after ms " + operationTimeoutMs));
+ }
+ }, operationTimeoutMs, TimeUnit.MILLISECONDS);
return true;
}
return false;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
new file mode 100644
index 0000000..a73342b
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.PulsarHandler;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+public class ClientCnxTest {
+
+ @Test
+ public void testClientCnxTimeout() throws Exception {
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setOperationTimeout(10, TimeUnit.MILLISECONDS);
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ ChannelFuture listenerFuture = mock(ChannelFuture.class);
+ when(listenerFuture.addListener(anyObject())).thenReturn(listenerFuture);
+ when(ctx.writeAndFlush(anyObject())).thenReturn(listenerFuture);
+
+ Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
+ ctxField.setAccessible(true);
+ ctxField.set(cnx, ctx);
+ try {
+ cnx.newLookup(null, 123).get();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
rdhabalia@apache.org.