You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/29 08:10:01 UTC

[rocketmq] branch develop updated (12ea1a735 -> c0e375123)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


    from 12ea1a735 Support ChannelZ to observe gRPC server (#4664)
     new 4619fd767 fix: write TelemetryCommand observer in writeLock
     new c0e375123 doc: add logger when write telemetry failed

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../proxy/grpc/v2/channel/GrpcClientChannel.java   |  52 ++++++--
 .../mqclient/ProxyClientRemotingProcessor.java     |  12 +-
 .../mqclient/ProxyClientRemotingProcessorTest.java | 139 +++++++++++++++++++++
 3 files changed, 195 insertions(+), 8 deletions(-)
 create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java


[rocketmq] 02/02: doc: add logger when write telemetry failed

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c0e375123a2c9d7138c074353440b5395a3223af
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Fri Jul 29 14:22:25 2022 +0800

    doc: add logger when write telemetry failed
---
 .../org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index 968635bae..810534bd2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -214,7 +214,8 @@ public class GrpcClientChannel extends ProxyChannel {
             }
             try {
                 observer.onNext(command);
-            } catch (StatusRuntimeException | IllegalStateException statusRuntimeException) {
+            } catch (StatusRuntimeException | IllegalStateException exception) {
+                log.warn("write telemetry failed. command:{}", command, exception);
                 this.clearClientObserver(observer);
             }
         }


[rocketmq] 01/02: fix: write TelemetryCommand observer in writeLock

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4619fd767fe7b0430de7973b3365b4a6393d0f02
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Thu Jul 28 19:56:57 2022 +0800

    fix: write TelemetryCommand observer in writeLock
---
 .../proxy/grpc/v2/channel/GrpcClientChannel.java   |  51 ++++++--
 .../mqclient/ProxyClientRemotingProcessor.java     |  12 +-
 .../mqclient/ProxyClientRemotingProcessorTest.java | 139 +++++++++++++++++++++
 3 files changed, 194 insertions(+), 8 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index d0ef56159..968635bae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -20,17 +20,23 @@ import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
+import com.google.protobuf.TextFormat;
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import io.netty.channel.ChannelId;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
@@ -40,12 +46,13 @@ import org.apache.rocketmq.proxy.service.transaction.TransactionData;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class GrpcClientChannel extends ProxyChannel {
-
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected static final String SEPARATOR = "@";
 
     private final GrpcChannelManager grpcChannelManager;
 
     private final AtomicReference<StreamObserver<TelemetryCommand>> telemetryCommandRef = new AtomicReference<>();
+    private final Object telemetryWriteLock = new Object();
     private final String group;
     private final String clientId;
 
@@ -101,6 +108,10 @@ public class GrpcClientChannel extends ProxyChannel {
         this.telemetryCommandRef.set(future);
     }
 
+    protected void clearClientObserver(StreamObserver<TelemetryCommand> future) {
+        this.telemetryCommandRef.compareAndSet(future, null);
+    }
+
     @Override
     public boolean isOpen() {
         return this.telemetryCommandRef.get() != null;
@@ -120,7 +131,7 @@ public class GrpcClientChannel extends ProxyChannel {
     protected CompletableFuture<Void> processOtherMessage(Object msg) {
         if (msg instanceof TelemetryCommand) {
             TelemetryCommand response = (TelemetryCommand) msg;
-            this.getTelemetryCommandStreamObserver().onNext(response);
+            this.writeTelemetryCommand(response);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -130,7 +141,7 @@ public class GrpcClientChannel extends ProxyChannel {
         MessageExt messageExt, TransactionData transactionData, CompletableFuture<ProxyRelayResult<Void>> responseFuture) {
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         try {
-            this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+            this.writeTelemetryCommand(TelemetryCommand.newBuilder()
                 .setRecoverOrphanedTransactionCommand(RecoverOrphanedTransactionCommand.newBuilder()
                     .setTransactionId(transactionData.getTransactionId())
                     .setMessage(GrpcConverter.getInstance().buildMessage(messageExt))
@@ -152,7 +163,7 @@ public class GrpcClientChannel extends ProxyChannel {
         if (!header.isJstackEnable()) {
             return CompletableFuture.completedFuture(null);
         }
-        this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+        this.writeTelemetryCommand(TelemetryCommand.newBuilder()
             .setPrintThreadStackTraceCommand(PrintThreadStackTraceCommand.newBuilder()
                 .setNonce(this.grpcChannelManager.addResponseFuture(responseFuture))
                 .build())
@@ -164,7 +175,7 @@ public class GrpcClientChannel extends ProxyChannel {
     protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command,
         ConsumeMessageDirectlyResultRequestHeader header,
         MessageExt messageExt, CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) {
-        this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+        this.writeTelemetryCommand(TelemetryCommand.newBuilder()
             .setVerifyMessageCommand(VerifyMessageCommand.newBuilder()
                 .setNonce(this.grpcChannelManager.addResponseFuture(responseFuture))
                 .setMessage(GrpcConverter.getInstance().buildMessage(messageExt))
@@ -189,7 +200,33 @@ public class GrpcClientChannel extends ProxyChannel {
         return localAddress;
     }
 
-    public StreamObserver<TelemetryCommand> getTelemetryCommandStreamObserver() {
-        return this.telemetryCommandRef.get();
+    public void writeTelemetryCommand(TelemetryCommand command) {
+        StreamObserver<TelemetryCommand> observer = this.telemetryCommandRef.get();
+        if (observer == null) {
+            log.warn("telemetry command observer is null when try to write data. command:{}, channel:{}", TextFormat.shortDebugString(command), this);
+            return;
+        }
+        synchronized (this.telemetryWriteLock) {
+            observer = this.telemetryCommandRef.get();
+            if (observer == null) {
+                log.warn("telemetry command observer is null when try to write data. command:{}, channel:{}", TextFormat.shortDebugString(command), this);
+                return;
+            }
+            try {
+                observer.onNext(command);
+            } catch (StatusRuntimeException | IllegalStateException statusRuntimeException) {
+                this.clearClientObserver(observer);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("group", group)
+            .add("clientId", clientId)
+            .add("remoteAddress", getRemoteAddress())
+            .add("localAddress", getLocalAddress())
+            .toString();
     }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
index 5cb36d1a2..340d56211 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
@@ -16,21 +16,26 @@
  */
 package org.apache.rocketmq.proxy.service.mqclient;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.nio.ByteBuffer;
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ProxyClientRemotingProcessor extends ClientRemotingProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     private final ProducerManager producerManager;
 
     public ProxyClientRemotingProcessor(ProducerManager producerManager) {
@@ -59,7 +64,12 @@ public class ProxyClientRemotingProcessor extends ClientRemotingProcessor {
                     (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
                 request.writeCustomHeader(requestHeader);
                 request.addExtField(ProxyUtils.BROKER_ADDR, RemotingUtil.socketAddress2String(ctx.channel().remoteAddress()));
-                this.producerManager.getAvailableChannel(group).writeAndFlush(request);
+                Channel channel = this.producerManager.getAvailableChannel(group);
+                if (channel != null) {
+                    channel.writeAndFlush(request);
+                } else {
+                    log.warn("check transaction failed, channel is empty. groupId={}, requestHeader:{}", group, requestHeader);
+                }
             }
         }
         return null;
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
new file mode 100644
index 000000000..c365aa9d0
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.mqclient;
+
+import apache.rocketmq.v2.TelemetryCommand;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ServerCallStreamObserver;
+import io.netty.channel.Channel;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.relay.RelayData;
+import org.apache.rocketmq.proxy.service.transaction.TransactionData;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyClientRemotingProcessorTest {
+    @Mock
+    private ProducerManager producerManager;
+    @Mock
+    private ProxyRelayService proxyRelayService;
+
+    @Test
+    public void testTransactionCheck() throws Exception {
+        CompletableFuture<ProxyRelayResult<Void>> proxyRelayResultFuture = new CompletableFuture<>();
+        when(proxyRelayService.processCheckTransactionState(any(), any(), any(), any()))
+            .thenReturn(new RelayData<>(
+                new TransactionData("brokerName", 0, 0, "id", System.currentTimeMillis(), 3000),
+                proxyRelayResultFuture));
+
+        GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, null,
+            ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "group", "clientId");
+        when(producerManager.getAvailableChannel(anyString()))
+            .thenReturn(grpcClientChannel);
+
+        ProxyClientRemotingProcessor processor = new ProxyClientRemotingProcessor(producerManager);
+        CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
+        RemotingCommand command = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+        MessageExt message = new MessageExt();
+        message.setQueueId(0);
+        message.setFlag(12);
+        message.setQueueOffset(0L);
+        message.setCommitLogOffset(100L);
+        message.setSysFlag(0);
+        message.setBornTimestamp(System.currentTimeMillis());
+        message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+        message.setStoreTimestamp(System.currentTimeMillis());
+        message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+        message.setBody("body".getBytes());
+        message.setTopic("topic");
+        MessageAccessor.putProperty(message, MessageConst.PROPERTY_PRODUCER_GROUP, "group");
+        command.setBody(MessageDecoder.encode(message, false));
+
+        processor.processRequest(new MockChannelHandlerContext(null), command);
+
+        ServerCallStreamObserver<TelemetryCommand> observer = mock(ServerCallStreamObserver.class);
+        grpcClientChannel.setClientObserver(observer);
+
+        processor.processRequest(new MockChannelHandlerContext(null), command);
+        verify(observer, times(1)).onNext(any());
+
+        // throw exception to test clear observer
+        doThrow(new StatusRuntimeException(Status.CANCELLED)).when(observer).onNext(any());
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        AtomicInteger count = new AtomicInteger();
+        for (int i = 0; i < 100; i++) {
+            executorService.submit(() -> {
+                try {
+                    processor.processRequest(new MockChannelHandlerContext(null), command);
+                    count.incrementAndGet();
+                } catch (RemotingCommandException ignored) {
+                }
+            });
+        }
+        await().atMost(Duration.ofSeconds(1)).until(() -> count.get() == 100);
+        verify(observer, times(2)).onNext(any());
+    }
+
+    protected static class MockChannelHandlerContext extends SimpleChannelHandlerContext {
+
+        public MockChannelHandlerContext(Channel channel) {
+            super(channel);
+        }
+
+        @Override
+        public Channel channel() {
+            Channel channel = mock(Channel.class);
+            when(channel.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress("127.0.0.1:10911"));
+            return channel;
+        }
+    }
+}
\ No newline at end of file