You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/29 08:10:02 UTC

[rocketmq] 01/02: fix: write TelemetryCommand observer in writeLock

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4619fd767fe7b0430de7973b3365b4a6393d0f02
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Thu Jul 28 19:56:57 2022 +0800

    fix: write TelemetryCommand observer in writeLock
---
 .../proxy/grpc/v2/channel/GrpcClientChannel.java   |  51 ++++++--
 .../mqclient/ProxyClientRemotingProcessor.java     |  12 +-
 .../mqclient/ProxyClientRemotingProcessorTest.java | 139 +++++++++++++++++++++
 3 files changed, 194 insertions(+), 8 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index d0ef56159..968635bae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -20,17 +20,23 @@ import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
+import com.google.protobuf.TextFormat;
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import io.netty.channel.ChannelId;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
@@ -40,12 +46,13 @@ import org.apache.rocketmq.proxy.service.transaction.TransactionData;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class GrpcClientChannel extends ProxyChannel {
-
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected static final String SEPARATOR = "@";
 
     private final GrpcChannelManager grpcChannelManager;
 
     private final AtomicReference<StreamObserver<TelemetryCommand>> telemetryCommandRef = new AtomicReference<>();
+    private final Object telemetryWriteLock = new Object();
     private final String group;
     private final String clientId;
 
@@ -101,6 +108,10 @@ public class GrpcClientChannel extends ProxyChannel {
         this.telemetryCommandRef.set(future);
     }
 
+    protected void clearClientObserver(StreamObserver<TelemetryCommand> future) {
+        this.telemetryCommandRef.compareAndSet(future, null);
+    }
+
     @Override
     public boolean isOpen() {
         return this.telemetryCommandRef.get() != null;
@@ -120,7 +131,7 @@ public class GrpcClientChannel extends ProxyChannel {
     protected CompletableFuture<Void> processOtherMessage(Object msg) {
         if (msg instanceof TelemetryCommand) {
             TelemetryCommand response = (TelemetryCommand) msg;
-            this.getTelemetryCommandStreamObserver().onNext(response);
+            this.writeTelemetryCommand(response);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -130,7 +141,7 @@ public class GrpcClientChannel extends ProxyChannel {
         MessageExt messageExt, TransactionData transactionData, CompletableFuture<ProxyRelayResult<Void>> responseFuture) {
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         try {
-            this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+            this.writeTelemetryCommand(TelemetryCommand.newBuilder()
                 .setRecoverOrphanedTransactionCommand(RecoverOrphanedTransactionCommand.newBuilder()
                     .setTransactionId(transactionData.getTransactionId())
                     .setMessage(GrpcConverter.getInstance().buildMessage(messageExt))
@@ -152,7 +163,7 @@ public class GrpcClientChannel extends ProxyChannel {
         if (!header.isJstackEnable()) {
             return CompletableFuture.completedFuture(null);
         }
-        this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+        this.writeTelemetryCommand(TelemetryCommand.newBuilder()
             .setPrintThreadStackTraceCommand(PrintThreadStackTraceCommand.newBuilder()
                 .setNonce(this.grpcChannelManager.addResponseFuture(responseFuture))
                 .build())
@@ -164,7 +175,7 @@ public class GrpcClientChannel extends ProxyChannel {
     protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command,
         ConsumeMessageDirectlyResultRequestHeader header,
         MessageExt messageExt, CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) {
-        this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+        this.writeTelemetryCommand(TelemetryCommand.newBuilder()
             .setVerifyMessageCommand(VerifyMessageCommand.newBuilder()
                 .setNonce(this.grpcChannelManager.addResponseFuture(responseFuture))
                 .setMessage(GrpcConverter.getInstance().buildMessage(messageExt))
@@ -189,7 +200,33 @@ public class GrpcClientChannel extends ProxyChannel {
         return localAddress;
     }
 
-    public StreamObserver<TelemetryCommand> getTelemetryCommandStreamObserver() {
-        return this.telemetryCommandRef.get();
+    public void writeTelemetryCommand(TelemetryCommand command) {
+        StreamObserver<TelemetryCommand> observer = this.telemetryCommandRef.get();
+        if (observer == null) {
+            log.warn("telemetry command observer is null when try to write data. command:{}, channel:{}", TextFormat.shortDebugString(command), this);
+            return;
+        }
+        synchronized (this.telemetryWriteLock) {
+            observer = this.telemetryCommandRef.get();
+            if (observer == null) {
+                log.warn("telemetry command observer is null when try to write data. command:{}, channel:{}", TextFormat.shortDebugString(command), this);
+                return;
+            }
+            try {
+                observer.onNext(command);
+            } catch (StatusRuntimeException | IllegalStateException statusRuntimeException) {
+                this.clearClientObserver(observer);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("group", group)
+            .add("clientId", clientId)
+            .add("remoteAddress", getRemoteAddress())
+            .add("localAddress", getLocalAddress())
+            .toString();
     }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
index 5cb36d1a2..340d56211 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
@@ -16,21 +16,26 @@
  */
 package org.apache.rocketmq.proxy.service.mqclient;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.nio.ByteBuffer;
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ProxyClientRemotingProcessor extends ClientRemotingProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     private final ProducerManager producerManager;
 
     public ProxyClientRemotingProcessor(ProducerManager producerManager) {
@@ -59,7 +64,12 @@ public class ProxyClientRemotingProcessor extends ClientRemotingProcessor {
                     (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
                 request.writeCustomHeader(requestHeader);
                 request.addExtField(ProxyUtils.BROKER_ADDR, RemotingUtil.socketAddress2String(ctx.channel().remoteAddress()));
-                this.producerManager.getAvailableChannel(group).writeAndFlush(request);
+                Channel channel = this.producerManager.getAvailableChannel(group);
+                if (channel != null) {
+                    channel.writeAndFlush(request);
+                } else {
+                    log.warn("check transaction failed, channel is empty. groupId={}, requestHeader:{}", group, requestHeader);
+                }
             }
         }
         return null;
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
new file mode 100644
index 000000000..c365aa9d0
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.mqclient;
+
+import apache.rocketmq.v2.TelemetryCommand;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ServerCallStreamObserver;
+import io.netty.channel.Channel;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.relay.RelayData;
+import org.apache.rocketmq.proxy.service.transaction.TransactionData;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyClientRemotingProcessorTest {
+    @Mock
+    private ProducerManager producerManager;
+    @Mock
+    private ProxyRelayService proxyRelayService;
+
+    @Test
+    public void testTransactionCheck() throws Exception {
+        CompletableFuture<ProxyRelayResult<Void>> proxyRelayResultFuture = new CompletableFuture<>();
+        when(proxyRelayService.processCheckTransactionState(any(), any(), any(), any()))
+            .thenReturn(new RelayData<>(
+                new TransactionData("brokerName", 0, 0, "id", System.currentTimeMillis(), 3000),
+                proxyRelayResultFuture));
+
+        GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, null,
+            ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "group", "clientId");
+        when(producerManager.getAvailableChannel(anyString()))
+            .thenReturn(grpcClientChannel);
+
+        ProxyClientRemotingProcessor processor = new ProxyClientRemotingProcessor(producerManager);
+        CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
+        RemotingCommand command = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+        MessageExt message = new MessageExt();
+        message.setQueueId(0);
+        message.setFlag(12);
+        message.setQueueOffset(0L);
+        message.setCommitLogOffset(100L);
+        message.setSysFlag(0);
+        message.setBornTimestamp(System.currentTimeMillis());
+        message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+        message.setStoreTimestamp(System.currentTimeMillis());
+        message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+        message.setBody("body".getBytes());
+        message.setTopic("topic");
+        MessageAccessor.putProperty(message, MessageConst.PROPERTY_PRODUCER_GROUP, "group");
+        command.setBody(MessageDecoder.encode(message, false));
+
+        processor.processRequest(new MockChannelHandlerContext(null), command);
+
+        ServerCallStreamObserver<TelemetryCommand> observer = mock(ServerCallStreamObserver.class);
+        grpcClientChannel.setClientObserver(observer);
+
+        processor.processRequest(new MockChannelHandlerContext(null), command);
+        verify(observer, times(1)).onNext(any());
+
+        // throw exception to test clear observer
+        doThrow(new StatusRuntimeException(Status.CANCELLED)).when(observer).onNext(any());
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        AtomicInteger count = new AtomicInteger();
+        for (int i = 0; i < 100; i++) {
+            executorService.submit(() -> {
+                try {
+                    processor.processRequest(new MockChannelHandlerContext(null), command);
+                    count.incrementAndGet();
+                } catch (RemotingCommandException ignored) {
+                }
+            });
+        }
+        await().atMost(Duration.ofSeconds(1)).until(() -> count.get() == 100);
+        verify(observer, times(2)).onNext(any());
+    }
+
+    protected static class MockChannelHandlerContext extends SimpleChannelHandlerContext {
+
+        public MockChannelHandlerContext(Channel channel) {
+            super(channel);
+        }
+
+        @Override
+        public Channel channel() {
+            Channel channel = mock(Channel.class);
+            when(channel.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress("127.0.0.1:10911"));
+            return channel;
+        }
+    }
+}
\ No newline at end of file