You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/29 00:34:47 UTC

[pulsar] branch master updated: [Issue 8138][pulsar-client] Improve timeout handling in ClientCnx to cover all remaining request types (GetLastMessageId, GetTopics, GetSchema, GetOrCreateSchema) (#8149)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf6a88e  [Issue 8138][pulsar-client] Improve timeout handling in ClientCnx to cover all remaining request types (GetLastMessageId, GetTopics, GetSchema, GetOrCreateSchema) (#8149)
bf6a88e is described below

commit bf6a88e864cb6f23d4cadc64cb3790182d68a814
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Sep 29 03:34:26 2020 +0300

    [Issue 8138][pulsar-client] Improve timeout handling in ClientCnx to cover all remaining request types (GetLastMessageId, GetTopics, GetSchema, GetOrCreateSchema) (#8149)
    
    * Add timeout handling for all other request types in ClientCnx
    Master Issue: #8138
    
    ### Motivation
    
    Since one of the root causes for #8138 is that there isn't timeout handling for GetLastMessageId requests, this PR adds that.
    The PR improves timeout handling in ClientCnx to cover all remaining request types that don't currently have timeout handling. The request types are:
        - GetLastMessageId
        - GetTopics
        - GetSchema
        - GetOrCreateSchema
    
    ### Modifications
    
    The existing requestTimeoutQueue solution to handle "ordinary" command requests has been extended to cover all the requests types.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
    - Added unit tests in a new test class ClientCnxRequestTimeoutQueueTest to test request timeout handling.
        - `cd pulsar-client; mvn test -Dtest=ClientCnxRequestTimeoutQueueTest`
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 118 +++++++++------------
 .../client/impl/TransactionMetaStoreHandler.java   |  11 +-
 .../impl/ClientCnxRequestTimeoutQueueTest.java     | 106 ++++++++++++++++++
 3 files changed, 165 insertions(+), 70 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6f8c511..936b27f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -119,7 +119,7 @@ public class ClientCnx extends PulsarHandler {
     private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
 
     private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
-    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<RequestTime<?>> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
     private final Semaphore pendingLookupRequestSemaphore;
     private final Semaphore maxLookupRequestSemaphore;
     private final EventLoopGroup eventLoopGroup;
@@ -154,17 +154,37 @@ public class ClientCnx extends PulsarHandler {
         None, SentConnectFrame, Ready, Failed, Connecting
     }
 
-    static class RequestTime {
-        long creationTimeMs;
-        long requestId;
+    private static class RequestTime<T> {
+        final long creationTimeMs;
+        final long requestId;
+        final RequestType requestType;
+        final ConcurrentLongHashMap<CompletableFuture<T>> pendingRequestsMap;
 
-        public RequestTime(long creationTime, long requestId) {
-            super();
+        RequestTime(long creationTime, long requestId, RequestType requestType, ConcurrentLongHashMap<CompletableFuture<T>> pendingRequestsMap) {
             this.creationTimeMs = creationTime;
             this.requestId = requestId;
+            this.requestType = requestType;
+            this.pendingRequestsMap = pendingRequestsMap;
         }
     }
 
+    private enum RequestType {
+        Command,
+        GetLastMessageId,
+        GetTopics,
+        GetSchema,
+        GetOrCreateSchema;
+
+        String getDescription() {
+            if (this == Command) {
+                return "request";
+            } else {
+                return name() + " request";
+            }
+        }
+    }
+
+
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
         this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
     }
@@ -240,6 +260,7 @@ public class ClientCnx extends PulsarHandler {
         pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingGetSchemaRequests.forEach((key, future) -> future.completeExceptionally(e));
+        pendingGetOrCreateSchemaRequests.forEach((key, future) -> future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -251,6 +272,8 @@ public class ClientCnx extends PulsarHandler {
         waitingLookupRequests.clear();
         pendingGetLastMessageIdRequests.clear();
         pendingGetTopicsRequests.clear();
+        pendingGetSchemaRequests.clear();
+        pendingGetOrCreateSchemaRequests.clear();
 
         producers.clear();
         consumers.clear();
@@ -682,19 +705,7 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf request, long requestId) {
-        CompletableFuture<List<String>> future = new CompletableFuture<>();
-
-        pendingGetTopicsRequests.put(requestId, future);
-        ctx.writeAndFlush(request).addListener(writeFuture -> {
-            if (!writeFuture.isSuccess()) {
-                log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
-                    writeFuture.cause().getMessage());
-                pendingGetTopicsRequests.remove(requestId);
-                future.completeExceptionally(writeFuture.cause());
-            }
-        });
-
-        return future;
+        return sendRequestAndHandleTimeout(request, requestId, pendingGetTopicsRequests, RequestType.GetTopics);
     }
 
     @Override
@@ -764,33 +775,25 @@ public class ClientCnx extends PulsarHandler {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) {
-        CompletableFuture<ProducerResponse> future = new CompletableFuture<>();
-        pendingRequests.put(requestId, future);
-        ctx.writeAndFlush(cmd).addListener(writeFuture -> {
+        return sendRequestAndHandleTimeout(cmd, requestId, pendingRequests, RequestType.Command);
+    }
+
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, ConcurrentLongHashMap<CompletableFuture<T>> pendingRequestsMap, RequestType requestType) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        pendingRequestsMap.put(requestId, future);
+        ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
             if (!writeFuture.isSuccess()) {
-                log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage());
-                pendingRequests.remove(requestId);
+                log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
+                pendingRequestsMap.remove(requestId);
                 future.completeExceptionally(writeFuture.cause());
             }
         });
-        requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        requestTimeoutQueue.add(new RequestTime<T>(System.currentTimeMillis(), requestId, requestType, pendingRequestsMap));
         return future;
     }
 
     public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) {
-        CompletableFuture<MessageIdData> future = new CompletableFuture<>();
-
-        pendingGetLastMessageIdRequests.put(requestId, future);
-
-        ctx.writeAndFlush(request).addListener(writeFuture -> {
-            if (!writeFuture.isSuccess()) {
-                log.warn("{} Failed to send GetLastMessageId request to broker: {}", ctx.channel(), writeFuture.cause().getMessage());
-                pendingGetLastMessageIdRequests.remove(requestId);
-                future.completeExceptionally(writeFuture.cause());
-            }
-        });
-
-        return future;
+        return sendRequestAndHandleTimeout(request, requestId, pendingGetLastMessageIdRequests, RequestType.GetLastMessageId);
     }
 
     public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf request, long requestId) {
@@ -812,33 +815,11 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<CommandGetSchemaResponse> sendGetRawSchema(ByteBuf request, long requestId) {
-        CompletableFuture<CommandGetSchemaResponse> future = new CompletableFuture<>();
-
-        pendingGetSchemaRequests.put(requestId, future);
-
-        ctx.writeAndFlush(request).addListener(writeFuture -> {
-            if (!writeFuture.isSuccess()) {
-                log.warn("{} Failed to send GetSchema request to broker: {}", ctx.channel(),
-                        writeFuture.cause().getMessage());
-                pendingGetSchemaRequests.remove(requestId);
-                future.completeExceptionally(writeFuture.cause());
-            }
-        });
-
-        return future;
+        return sendRequestAndHandleTimeout(request, requestId, pendingGetSchemaRequests, RequestType.GetSchema);
     }
 
     public CompletableFuture<byte[]> sendGetOrCreateSchema(ByteBuf request, long requestId) {
-        CompletableFuture<CommandGetOrCreateSchemaResponse> future = new CompletableFuture<>();
-        pendingGetOrCreateSchemaRequests.put(requestId, future);
-        ctx.writeAndFlush(request).addListener(writeFuture -> {
-            if (!writeFuture.isSuccess()) {
-                log.warn("{} Failed to send GetOrCreateSchema request to broker: {}", ctx.channel(),
-                         writeFuture.cause().getMessage());
-                pendingGetOrCreateSchemaRequests.remove(requestId);
-                future.completeExceptionally(writeFuture.cause());
-            }
-        });
+        CompletableFuture<CommandGetOrCreateSchemaResponse> future = sendRequestAndHandleTimeout(request, requestId, pendingGetOrCreateSchemaRequests, RequestType.GetOrCreateSchema);
         return future.thenCompose(response -> {
             if (response.hasErrorCode()) {
                 // Request has failed
@@ -1069,13 +1050,12 @@ public class ClientCnx extends PulsarHandler {
                 break;
             }
             request = requestTimeoutQueue.poll();
-            CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(request.requestId);
-            if (requestFuture != null && !requestFuture.isDone()
-                    && requestFuture.completeExceptionally(new TimeoutException(
-                            request.requestId + " lookup request timedout after ms " + operationTimeoutMs))) {
-                log.warn("{} request {} timed out after {} ms", ctx.channel(), request.requestId, operationTimeoutMs);
-            } else {
-                // request is already completed successfully.
+            CompletableFuture<?> requestFuture = (CompletableFuture) request.pendingRequestsMap.remove(request.requestId);
+            if (requestFuture != null && !requestFuture.isDone()) {
+                String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), operationTimeoutMs);
+                if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
+                    log.warn("{} {}", ctx.channel(), timeoutMessage);
+                }
             }
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index a95c937..ed1c1b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -26,7 +26,6 @@ import io.netty.util.TimerTask;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.ClientCnx.RequestTime;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -55,6 +54,16 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
 
+    private static class RequestTime {
+        final long creationTimeMs;
+        final long requestId;
+
+        public RequestTime(long creationTime, long requestId) {
+            this.creationTimeMs = creationTime;
+            this.requestId = requestId;
+        }
+    }
+
     private final boolean blockIfReachMaxPendingOps;
     private final Semaphore semaphore;
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
new file mode 100644
index 0000000..d0bd9b5
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.*;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Contains request timeout tests for different request types in ClientCnx
+ * that use the requestTimeoutQueue based solution to handle timeouts.
+ *
+ * This includes ordinary command requests, GetLastMessageId requests,
+ * GetTopics requests, GetSchema requests and GetOrCreateSchema requests.
+ */
+public class ClientCnxRequestTimeoutQueueTest {
+    ClientCnx cnx;
+    EventLoopGroup eventLoop;
+    ByteBuf requestMessage;
+
+    @BeforeTest
+    void setupClientCnx() throws Exception {
+        eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setKeepAliveIntervalSeconds(0);
+        conf.setOperationTimeoutMs(1);
+        cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(ctx.writeAndFlush(any())).thenAnswer(args -> mock(ChannelFuture.class));
+        when(ctx.channel()).thenReturn(channel);
+        when(channel.remoteAddress()).thenReturn(new InetSocketAddress(1234));
+        cnx.channelActive(ctx);
+
+        requestMessage = mock(ByteBuf.class);
+    }
+
+    @AfterTest
+    void cleanupClientCnx() {
+        eventLoop.shutdownNow();
+    }
+
+    @Test
+    void testCommandRequestTimeout() {
+        assertFutureTimesOut(cnx.sendRequestWithId(requestMessage, 1L));
+    }
+
+    @Test
+    void testGetLastMessageIdRequestTimeout() {
+        assertFutureTimesOut(cnx.sendGetLastMessageId(requestMessage, 1L));
+    }
+
+    @Test
+    void testGetTopicsRequestTimeout() {
+        assertFutureTimesOut(cnx.newGetTopicsOfNamespace(requestMessage, 1L));
+    }
+
+    @Test
+    void testGetSchemaRequestTimeout() {
+        assertFutureTimesOut(cnx.sendGetRawSchema(requestMessage, 1L));
+    }
+
+    @Test
+    void testGetOrCreateSchemaRequestTimeout() {
+        assertFutureTimesOut(cnx.sendGetOrCreateSchema(requestMessage, 1L));
+    }
+
+    private void assertFutureTimesOut(CompletableFuture<?> future) {
+        try {
+            future.get(1, TimeUnit.SECONDS);
+            fail("Future should have timed out.");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
+        }
+    }
+}