You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/12 01:57:57 UTC

[pulsar] 01/02: Fixed handling errors for client requests (#8518)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb5e81b5573b796fa426423dd1ca527eddce1e46
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Nov 10 22:45:41 2020 -0800

    Fixed handling errors for client requests (#8518)
    
    `ClientCnx` is maintaining different maps for different types of requests. The problem is that on `handleError()` it's only checking on the main map, the one containing requests for creating producers and consumers, but it's ignoring the error on other kind of requests.
    For example, an error returned by broker on the `GetLastMessageId` request, will result in calls to `hasMessageAvailable()` to get stuck, since the error is ignored and the future never completed:
    
    ```
    18:11:09.083 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ClientCnx@602] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xa378924b, L:/127.0.0.1:58267 - R:localhost/127.0.0.1:6650] Received error from server: XXX
    18:11:09.084 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ClientCnx@612] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xa378924b, L:/127.0.0.1:58267 - R:localhost/127.0.0.1:6650] Received unknown request id from server: 3
    ```
    
    There's no good reason to keep 1 map per kind of request. Instead, we should use a single map.
    
    (cherry picked from commit bb45b8c080fde51b759e06b58e036664361cb73b)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 78 +++++++++-------------
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 56 ++++++++++++++++
 2 files changed, 87 insertions(+), 47 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6c74b37..2f3b882 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -92,33 +92,23 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressWarnings("unchecked")
 public class ClientCnx extends PulsarHandler {
 
     protected final Authentication authentication;
     private State state;
 
-    private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> pendingRequests =
-        new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests =
+    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
         new ConcurrentLongHashMap<>(16, 1);
     // LookupRequests that waiting in client side.
     private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
-    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests =
-        new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests =
-        new ConcurrentLongHashMap<>(16, 1);
-
-    private final ConcurrentLongHashMap<CompletableFuture<CommandGetSchemaResponse>> pendingGetSchemaRequests = new ConcurrentLongHashMap<>(
-            16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<CommandGetOrCreateSchemaResponse>> pendingGetOrCreateSchemaRequests =
-            new ConcurrentLongHashMap<>(16, 1);
 
     private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
 
     private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
-    private final ConcurrentLinkedQueue<RequestTime<?>> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
     private final Semaphore pendingLookupRequestSemaphore;
     private final Semaphore maxLookupRequestSemaphore;
     private final EventLoopGroup eventLoopGroup;
@@ -152,17 +142,15 @@ public class ClientCnx extends PulsarHandler {
         None, SentConnectFrame, Ready, Failed, Connecting
     }
 
-    private static class RequestTime<T> {
+    private static class RequestTime {
         final long creationTimeMs;
         final long requestId;
         final RequestType requestType;
-        final ConcurrentLongHashMap<CompletableFuture<T>> pendingRequestsMap;
 
-        RequestTime(long creationTime, long requestId, RequestType requestType, ConcurrentLongHashMap<CompletableFuture<T>> pendingRequestsMap) {
+        RequestTime(long creationTime, long requestId, RequestType requestType) {
             this.creationTimeMs = creationTime;
             this.requestId = requestId;
             this.requestType = requestType;
-            this.pendingRequestsMap = pendingRequestsMap;
         }
     }
 
@@ -253,12 +241,7 @@ public class ClientCnx extends PulsarHandler {
 
         // Fail out all the pending ops
         pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
-        pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
         waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e));
-        pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e));
-        pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
-        pendingGetSchemaRequests.forEach((key, future) -> future.completeExceptionally(e));
-        pendingGetOrCreateSchemaRequests.forEach((key, future) -> future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -266,12 +249,7 @@ public class ClientCnx extends PulsarHandler {
         transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this));
 
         pendingRequests.clear();
-        pendingLookupRequests.clear();
         waitingLookupRequests.clear();
-        pendingGetLastMessageIdRequests.clear();
-        pendingGetTopicsRequests.clear();
-        pendingGetSchemaRequests.clear();
-        pendingGetOrCreateSchemaRequests.clear();
 
         producers.clear();
         consumers.clear();
@@ -432,7 +410,7 @@ public class ClientCnx extends PulsarHandler {
             log.debug("{} Received success response from server: {}", ctx.channel(), success.getRequestId());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId);
+        CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.complete(null);
         } else {
@@ -448,7 +426,7 @@ public class ClientCnx extends PulsarHandler {
             log.debug("{} Received success GetLastMessageId response from server: {}", ctx.channel(), success.getRequestId());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<MessageIdData> requestFuture = pendingGetLastMessageIdRequests.remove(requestId);
+        CompletableFuture<MessageIdData> requestFuture = (CompletableFuture<MessageIdData>) pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.complete(success.getLastMessageId());
         } else {
@@ -465,7 +443,7 @@ public class ClientCnx extends PulsarHandler {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = (CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.complete(new ProducerResponse(success.getProducerName(), success.getLastSequenceId(), success.getSchemaVersion().toByteArray()));
         } else {
@@ -558,7 +536,7 @@ public class ClientCnx extends PulsarHandler {
 
     // caller of this method needs to be protected under pendingLookupRequestSemaphore
     private void addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
-        pendingLookupRequests.put(requestId, future);
+        pendingRequests.put(requestId, future);
         eventLoopGroup.schedule(() -> {
             if (!future.isDone()) {
                 future.completeExceptionally(new TimeoutException(
@@ -568,7 +546,7 @@ public class ClientCnx extends PulsarHandler {
     }
 
     private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
-        CompletableFuture<LookupDataResult> result = pendingLookupRequests.remove(requestId);
+        CompletableFuture<LookupDataResult> result = (CompletableFuture<LookupDataResult>) pendingRequests.remove(requestId);
         if (result != null) {
             Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
             if (firstOneWaiting != null) {
@@ -628,7 +606,11 @@ public class ClientCnx extends PulsarHandler {
             log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic",
                     ctx.channel());
         }
-        CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId);
+        if (error.getError() == ServerError.AuthenticationError) {
+            connectionFuture.completeExceptionally(new PulsarClientException.AuthenticationException(error.getMessage()));
+            log.error("{} Failed to authenticate the client", ctx.channel());
+        }
+        CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage()));
         } else {
@@ -691,7 +673,7 @@ public class ClientCnx extends PulsarHandler {
                 }
                 future.completeExceptionally(new PulsarClientException.TooManyRequestsException(String.format(
                     "Requests number out of config: There are {%s} lookup requests outstanding and {%s} requests pending.",
-                    pendingLookupRequests.size(),
+                    pendingLookupRequestSemaphore.availablePermits(),
                     waitingLookupRequests.size())));
             }
         }
@@ -699,7 +681,7 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf request, long requestId) {
-        return sendRequestAndHandleTimeout(request, requestId, pendingGetTopicsRequests, RequestType.GetTopics);
+        return sendRequestAndHandleTimeout(request, requestId, RequestType.GetTopics);
     }
 
     @Override
@@ -714,7 +696,7 @@ public class ClientCnx extends PulsarHandler {
                 ctx.channel(), success.getRequestId(), topics.size());
         }
 
-        CompletableFuture<List<String>> requestFuture = pendingGetTopicsRequests.remove(requestId);
+        CompletableFuture<List<String>> requestFuture = (CompletableFuture<List<String>>) pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.complete(topics);
         } else {
@@ -728,7 +710,8 @@ public class ClientCnx extends PulsarHandler {
 
         long requestId = commandGetSchemaResponse.getRequestId();
 
-        CompletableFuture<CommandGetSchemaResponse> future = pendingGetSchemaRequests.remove(requestId);
+        CompletableFuture<CommandGetSchemaResponse> future = (CompletableFuture<CommandGetSchemaResponse>) pendingRequests
+                .remove(requestId);
         if (future == null) {
             log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId);
             return;
@@ -740,7 +723,8 @@ public class ClientCnx extends PulsarHandler {
     protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) {
         checkArgument(state == State.Ready);
         long requestId = commandGetOrCreateSchemaResponse.getRequestId();
-        CompletableFuture<CommandGetOrCreateSchemaResponse> future = pendingGetOrCreateSchemaRequests.remove(requestId);
+        CompletableFuture<CommandGetOrCreateSchemaResponse> future = (CompletableFuture<CommandGetOrCreateSchemaResponse>) pendingRequests
+                .remove(requestId);
         if (future == null) {
             log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId);
             return;
@@ -769,25 +753,25 @@ public class ClientCnx extends PulsarHandler {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) {
-        return sendRequestAndHandleTimeout(cmd, requestId, pendingRequests, RequestType.Command);
+        return sendRequestAndHandleTimeout(cmd, requestId, RequestType.Command);
     }
 
-    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, ConcurrentLongHashMap<CompletableFuture<T>> pendingRequestsMap, RequestType requestType) {
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
         CompletableFuture<T> future = new CompletableFuture<>();
-        pendingRequestsMap.put(requestId, future);
+        pendingRequests.put(requestId, future);
         ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
             if (!writeFuture.isSuccess()) {
                 log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
-                pendingRequestsMap.remove(requestId);
+                pendingRequests.remove(requestId);
                 future.completeExceptionally(writeFuture.cause());
             }
         });
-        requestTimeoutQueue.add(new RequestTime<T>(System.currentTimeMillis(), requestId, requestType, pendingRequestsMap));
+        requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
         return future;
     }
 
     public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) {
-        return sendRequestAndHandleTimeout(request, requestId, pendingGetLastMessageIdRequests, RequestType.GetLastMessageId);
+        return sendRequestAndHandleTimeout(request, requestId, RequestType.GetLastMessageId);
     }
 
     public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf request, long requestId) {
@@ -809,11 +793,11 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<CommandGetSchemaResponse> sendGetRawSchema(ByteBuf request, long requestId) {
-        return sendRequestAndHandleTimeout(request, requestId, pendingGetSchemaRequests, RequestType.GetSchema);
+        return sendRequestAndHandleTimeout(request, requestId, RequestType.GetSchema);
     }
 
     public CompletableFuture<byte[]> sendGetOrCreateSchema(ByteBuf request, long requestId) {
-        CompletableFuture<CommandGetOrCreateSchemaResponse> future = sendRequestAndHandleTimeout(request, requestId, pendingGetOrCreateSchemaRequests, RequestType.GetOrCreateSchema);
+        CompletableFuture<CommandGetOrCreateSchemaResponse> future = sendRequestAndHandleTimeout(request, requestId, RequestType.GetOrCreateSchema);
         return future.thenCompose(response -> {
             if (response.hasErrorCode()) {
                 // Request has failed
@@ -1007,7 +991,7 @@ public class ClientCnx extends PulsarHandler {
                 break;
             }
             request = requestTimeoutQueue.poll();
-            CompletableFuture<?> requestFuture = (CompletableFuture) request.pendingRequestsMap.remove(request.requestId);
+            CompletableFuture<?> requestFuture = pendingRequests.remove(request.requestId);
             if (requestFuture != null && !requestFuture.isDone()) {
                 String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), operationTimeoutMs);
                 if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index cc67d5e..0cb7f58 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -21,16 +21,22 @@ package org.apache.pulsar.client.impl;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadFactory;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.Test;
@@ -62,6 +68,8 @@ public class ClientCnxTest {
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
         }
+
+        eventLoop.shutdownGracefully();
     }
 
     @Test
@@ -93,6 +101,54 @@ public class ClientCnxTest {
         } catch (Exception e) {
             fail("should not throw any error");
         }
+
+        eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testGetLastMessageIdWithError() throws Exception {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+
+        Field ctxField = PulsarHandler.class.getDeclaredField("ctx");
+        ctxField.setAccessible(true);
+        ctxField.set(cnx, ctx);
+
+        final long requestId = 100;
+
+        // set connection as SentConnectFrame
+        Field cnxField = ClientCnx.class.getDeclaredField("state");
+        cnxField.setAccessible(true);
+        cnxField.set(cnx, ClientCnx.State.SentConnectFrame);
+
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+
+        ByteBuf getLastIdCmd = Commands.newGetLastMessageId(5, requestId);
+        CompletableFuture<?> future = cnx.sendGetLastMessageId(getLastIdCmd, requestId);
+
+        // receive error
+        PulsarApi.CommandError commandError = PulsarApi.CommandError.newBuilder()
+            .setRequestId(requestId)
+            .setError(PulsarApi.ServerError.MetadataError)
+            .setMessage("failed to read")
+            .build();
+        cnx.handleError(commandError);
+
+        try {
+            future.get();
+            fail("Should have failed");
+        } catch (ExecutionException e) {
+            assertEquals(e.getCause().getClass(), BrokerMetadataException.class);
+        }
+
+        eventLoop.shutdownGracefully();
+    }
 }