You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/29 08:10:02 UTC
[rocketmq] 01/02: fix: write TelemetryCommand observer in writeLock
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 4619fd767fe7b0430de7973b3365b4a6393d0f02
Author: kaiyi.lk <ka...@alibaba-inc.com>
AuthorDate: Thu Jul 28 19:56:57 2022 +0800
fix: write TelemetryCommand observer in writeLock
---
.../proxy/grpc/v2/channel/GrpcClientChannel.java | 51 ++++++--
.../mqclient/ProxyClientRemotingProcessor.java | 12 +-
.../mqclient/ProxyClientRemotingProcessorTest.java | 139 +++++++++++++++++++++
3 files changed, 194 insertions(+), 8 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index d0ef56159..968635bae 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -20,17 +20,23 @@ import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
+import com.google.protobuf.TextFormat;
+import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
@@ -40,12 +46,13 @@ import org.apache.rocketmq.proxy.service.transaction.TransactionData;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class GrpcClientChannel extends ProxyChannel {
-
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected static final String SEPARATOR = "@";
private final GrpcChannelManager grpcChannelManager;
private final AtomicReference<StreamObserver<TelemetryCommand>> telemetryCommandRef = new AtomicReference<>();
+ private final Object telemetryWriteLock = new Object();
private final String group;
private final String clientId;
@@ -101,6 +108,10 @@ public class GrpcClientChannel extends ProxyChannel {
this.telemetryCommandRef.set(future);
}
+ protected void clearClientObserver(StreamObserver<TelemetryCommand> future) {
+ this.telemetryCommandRef.compareAndSet(future, null);
+ }
+
@Override
public boolean isOpen() {
return this.telemetryCommandRef.get() != null;
@@ -120,7 +131,7 @@ public class GrpcClientChannel extends ProxyChannel {
protected CompletableFuture<Void> processOtherMessage(Object msg) {
if (msg instanceof TelemetryCommand) {
TelemetryCommand response = (TelemetryCommand) msg;
- this.getTelemetryCommandStreamObserver().onNext(response);
+ this.writeTelemetryCommand(response);
}
return CompletableFuture.completedFuture(null);
}
@@ -130,7 +141,7 @@ public class GrpcClientChannel extends ProxyChannel {
MessageExt messageExt, TransactionData transactionData, CompletableFuture<ProxyRelayResult<Void>> responseFuture) {
CompletableFuture<Void> writeFuture = new CompletableFuture<>();
try {
- this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+ this.writeTelemetryCommand(TelemetryCommand.newBuilder()
.setRecoverOrphanedTransactionCommand(RecoverOrphanedTransactionCommand.newBuilder()
.setTransactionId(transactionData.getTransactionId())
.setMessage(GrpcConverter.getInstance().buildMessage(messageExt))
@@ -152,7 +163,7 @@ public class GrpcClientChannel extends ProxyChannel {
if (!header.isJstackEnable()) {
return CompletableFuture.completedFuture(null);
}
- this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+ this.writeTelemetryCommand(TelemetryCommand.newBuilder()
.setPrintThreadStackTraceCommand(PrintThreadStackTraceCommand.newBuilder()
.setNonce(this.grpcChannelManager.addResponseFuture(responseFuture))
.build())
@@ -164,7 +175,7 @@ public class GrpcClientChannel extends ProxyChannel {
protected CompletableFuture<Void> processConsumeMessageDirectly(RemotingCommand command,
ConsumeMessageDirectlyResultRequestHeader header,
MessageExt messageExt, CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> responseFuture) {
- this.getTelemetryCommandStreamObserver().onNext(TelemetryCommand.newBuilder()
+ this.writeTelemetryCommand(TelemetryCommand.newBuilder()
.setVerifyMessageCommand(VerifyMessageCommand.newBuilder()
.setNonce(this.grpcChannelManager.addResponseFuture(responseFuture))
.setMessage(GrpcConverter.getInstance().buildMessage(messageExt))
@@ -189,7 +200,33 @@ public class GrpcClientChannel extends ProxyChannel {
return localAddress;
}
- public StreamObserver<TelemetryCommand> getTelemetryCommandStreamObserver() {
- return this.telemetryCommandRef.get();
+ public void writeTelemetryCommand(TelemetryCommand command) {
+ StreamObserver<TelemetryCommand> observer = this.telemetryCommandRef.get();
+ if (observer == null) {
+ log.warn("telemetry command observer is null when try to write data. command:{}, channel:{}", TextFormat.shortDebugString(command), this);
+ return;
+ }
+ synchronized (this.telemetryWriteLock) {
+ observer = this.telemetryCommandRef.get();
+ if (observer == null) {
+ log.warn("telemetry command observer is null when try to write data. command:{}, channel:{}", TextFormat.shortDebugString(command), this);
+ return;
+ }
+ try {
+ observer.onNext(command);
+ } catch (StatusRuntimeException | IllegalStateException statusRuntimeException) {
+ this.clearClientObserver(observer);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("group", group)
+ .add("clientId", clientId)
+ .add("remoteAddress", getRemoteAddress())
+ .add("localAddress", getLocalAddress())
+ .toString();
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
index 5cb36d1a2..340d56211 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessor.java
@@ -16,21 +16,26 @@
*/
package org.apache.rocketmq.proxy.service.mqclient;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ProxyClientRemotingProcessor extends ClientRemotingProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final ProducerManager producerManager;
public ProxyClientRemotingProcessor(ProducerManager producerManager) {
@@ -59,7 +64,12 @@ public class ProxyClientRemotingProcessor extends ClientRemotingProcessor {
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
request.writeCustomHeader(requestHeader);
request.addExtField(ProxyUtils.BROKER_ADDR, RemotingUtil.socketAddress2String(ctx.channel().remoteAddress()));
- this.producerManager.getAvailableChannel(group).writeAndFlush(request);
+ Channel channel = this.producerManager.getAvailableChannel(group);
+ if (channel != null) {
+ channel.writeAndFlush(request);
+ } else {
+ log.warn("check transaction failed, channel is empty. groupId={}, requestHeader:{}", group, requestHeader);
+ }
}
}
return null;
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
new file mode 100644
index 000000000..c365aa9d0
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.mqclient;
+
+import apache.rocketmq.v2.TelemetryCommand;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ServerCallStreamObserver;
+import io.netty.channel.Channel;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.broker.client.ProducerManager;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
+import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
+import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
+import org.apache.rocketmq.proxy.service.relay.RelayData;
+import org.apache.rocketmq.proxy.service.transaction.TransactionData;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProxyClientRemotingProcessorTest {
+ @Mock
+ private ProducerManager producerManager;
+ @Mock
+ private ProxyRelayService proxyRelayService;
+
+ @Test
+ public void testTransactionCheck() throws Exception {
+ CompletableFuture<ProxyRelayResult<Void>> proxyRelayResultFuture = new CompletableFuture<>();
+ when(proxyRelayService.processCheckTransactionState(any(), any(), any(), any()))
+ .thenReturn(new RelayData<>(
+ new TransactionData("brokerName", 0, 0, "id", System.currentTimeMillis(), 3000),
+ proxyRelayResultFuture));
+
+ GrpcClientChannel grpcClientChannel = new GrpcClientChannel(proxyRelayService, null,
+ ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"), "group", "clientId");
+ when(producerManager.getAvailableChannel(anyString()))
+ .thenReturn(grpcClientChannel);
+
+ ProxyClientRemotingProcessor processor = new ProxyClientRemotingProcessor(producerManager);
+ CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
+ RemotingCommand command = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+ MessageExt message = new MessageExt();
+ message.setQueueId(0);
+ message.setFlag(12);
+ message.setQueueOffset(0L);
+ message.setCommitLogOffset(100L);
+ message.setSysFlag(0);
+ message.setBornTimestamp(System.currentTimeMillis());
+ message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+ message.setStoreTimestamp(System.currentTimeMillis());
+ message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+ message.setBody("body".getBytes());
+ message.setTopic("topic");
+ MessageAccessor.putProperty(message, MessageConst.PROPERTY_PRODUCER_GROUP, "group");
+ command.setBody(MessageDecoder.encode(message, false));
+
+ processor.processRequest(new MockChannelHandlerContext(null), command);
+
+ ServerCallStreamObserver<TelemetryCommand> observer = mock(ServerCallStreamObserver.class);
+ grpcClientChannel.setClientObserver(observer);
+
+ processor.processRequest(new MockChannelHandlerContext(null), command);
+ verify(observer, times(1)).onNext(any());
+
+ // throw exception to test clear observer
+ doThrow(new StatusRuntimeException(Status.CANCELLED)).when(observer).onNext(any());
+
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < 100; i++) {
+ executorService.submit(() -> {
+ try {
+ processor.processRequest(new MockChannelHandlerContext(null), command);
+ count.incrementAndGet();
+ } catch (RemotingCommandException ignored) {
+ }
+ });
+ }
+ await().atMost(Duration.ofSeconds(1)).until(() -> count.get() == 100);
+ verify(observer, times(2)).onNext(any());
+ }
+
+ protected static class MockChannelHandlerContext extends SimpleChannelHandlerContext {
+
+ public MockChannelHandlerContext(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public Channel channel() {
+ Channel channel = mock(Channel.class);
+ when(channel.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress("127.0.0.1:10911"));
+ return channel;
+ }
+ }
+}
\ No newline at end of file