You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/03 09:09:10 UTC

[rocketmq-clients] 01/01: Java: fix compile issue on JDK17

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit fd7eb2cb713ab4d265c9ff907105edd186f7f586
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Sun Jul 3 17:02:57 2022 +0800

    Java: fix compile issue on JDK17
---
 .github/workflows/java_build.yml                   |   2 +-
 .../rocketmq/client/java/impl/ClientImpl.java      |   2 +-
 .../client/java/impl/consumer/ConsumerImpl.java    |  12 +--
 .../java/impl/consumer/ProcessQueueImpl.java       |  81 ++++++++++++----
 .../java/impl/consumer/PushConsumerSettings.java   |   3 +-
 .../java/impl/consumer/SimpleConsumerSettings.java |   3 +-
 .../java/impl/producer/ProducerSettings.java       |   3 +-
 .../java/impl/producer/TransactionImplTest.java    | 102 ---------------------
 java/style/spotbugs-suppressions.xml               |  41 ++++++++-
 9 files changed, 112 insertions(+), 137 deletions(-)

diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index 8bf1d0e..eda1e09 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -2,7 +2,7 @@ name: Java Build
 on: [push, pull_request]
 jobs:
   java_build:
-    name: "check (${{ matrix.os }} JDK-${{ matrix.jdk }})"
+    name: "${{ matrix.os }} JDK-${{ matrix.jdk }}"
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index b16221f..21aa92e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -435,7 +435,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
                         old, topicRouteDataResult);
                 }
-                future0.set(null);
+                future0.setFuture(Futures.immediateVoidFuture());
                 onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
             }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 18cee43..539b876 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -146,12 +146,8 @@ abstract class ConsumerImpl extends ClientImpl {
             final Metadata metadata = sign();
             future = clientManager.ackMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<AckMessageResponse> future0 = SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            return Futures.immediateFailedFuture(t);
         }
-        final String topic = messageView.getTopic();
-        final MessageId messageId = messageView.getMessageId();
         Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
             @Override
             public void onSuccess(AckMessageResponse response) {
@@ -160,10 +156,6 @@ abstract class ConsumerImpl extends ClientImpl {
                 final Duration duration = stopwatch.elapsed();
                 MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
-                if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}, " +
-                        "clientId={}", code, status.getMessage(), topic, messageId, clientId);
-                }
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, messageHookPointsStatus);
             }
 
@@ -171,8 +163,6 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onFailure(Throwable t) {
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
-                LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}",
-                    topic, messageId, clientId, t);
             }
         }, MoreExecutors.directExecutor());
         return future;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index dc648f6..35c3b50 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -376,12 +376,42 @@ class ProcessQueueImpl implements ProcessQueue {
         statsConsumptionResult(consumeResult);
         eraseMessage(messageView);
         if (ConsumeResult.SUCCESS.equals(consumeResult)) {
-            consumer.ackMessage(messageView);
+            ackMessage(messageView);
             return;
         }
         nackMessage(messageView);
     }
 
+    private void ackMessage(MessageViewImpl messageView) {
+        final String clientId = consumer.getClientId();
+        final String consumerGroup = consumer.getConsumerGroup();
+        final MessageId messageId = messageView.getMessageId();
+        final Endpoints endpoints = messageView.getEndpoints();
+        final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+            @Override
+            public void onSuccess(AckMessageResponse response) {
+                final Status status = response.getStatus();
+                final Code code = status.getCode();
+                if (Code.OK.equals(code)) {
+                    LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                        + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+                    return;
+                }
+                LOGGER.error("Failed to ack message, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                        + "endpoints={}, code={}, status message={}", clientId, consumerGroup, messageId, mq,
+                    endpoints, code, status.getMessage());
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOGGER.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, "
+                        + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq,
+                    endpoints, t);
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
     private void nackMessage(MessageViewImpl messageView) {
         final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(messageView.getDeliveryAttempt());
         consumer.changeInvisibleDuration(messageView, duration);
@@ -452,6 +482,9 @@ class ProcessQueueImpl implements ProcessQueue {
         final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
         final String clientId = consumer.getClientId();
+        final String consumerGroup = consumer.getConsumerGroup();
+        final MessageId messageId = messageView.getMessageId();
+        final Endpoints endpoints = messageView.getEndpoints();
         Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
             @Override
             public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
@@ -460,26 +493,31 @@ class ProcessQueueImpl implements ProcessQueue {
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
-                            " clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
-                        clientId, messageView.getMessageId(), attempt, mq, code, status.getMessage());
+                            " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, code={}, "
+                            + "status message={}", clientId, consumerGroup, messageId, attempt, mq, endpoints, code,
+                        status.getMessage());
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, attempt={}, " +
-                        "messageId={}, mq={}", clientId, attempt, messageView.getMessageId(), mq);
+                    LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+                            + "attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
+                        messageId, mq, endpoints);
+                } else {
+                    LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+                        + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
                 }
                 // Set result if message is forwarded successfully.
-                future0.set(null);
+                future0.setFuture(Futures.immediateVoidFuture());
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
                 LOGGER.error("Exception raised while forward message to DLQ, would attempt to re-forward later, " +
-                        "clientId={}, attempt={}, messageId={}, mq={}", clientId, attempt,
-                    messageView.getMessageId(), mq, t);
+                        "clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}", clientId, consumerGroup,
+                    attempt, messageId, mq, t);
                 forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0);
             }
         }, MoreExecutors.directExecutor());
@@ -518,8 +556,10 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void ackFifoMessage(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
-        final Endpoints endpoints = messageView.getEndpoints();
         final String clientId = consumer.getClientId();
+        final String consumerGroup = consumer.getConsumerGroup();
+        final MessageId messageId = messageView.getMessageId();
+        final Endpoints endpoints = messageView.getEndpoints();
         final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView);
         Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
             @Override
@@ -528,27 +568,32 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}," +
-                            " messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
-                        clientId, attempt, messageView.getMessageId(), mq, code, endpoints, status.getMessage());
+                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
+                            + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status "
+                            + "message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code,
+                        endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}," +
-                        " endpoints={}", clientId, attempt, messageView.getMessageId(), mq, endpoints);
+                    LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, "
+                            + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
+                        messageId, mq, endpoints);
+                } else {
+                    LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                        + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
                 }
                 // Set result if FIFO message is acknowledged successfully.
-                future0.set(null);
+                future0.setFuture(Futures.immediateVoidFuture());
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
-                LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later," +
-                        " attempt={}, messageId={}, mq={}, endpoints={}",
-                    clientId, attempt, messageView.getMessageId(), mq, endpoints, t);
+                LOGGER.error("Exception raised while acknowledging fifo message, clientId={}, consumerGroup={}, "
+                        + "would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}", clientId,
+                    consumerGroup, attempt, messageId, mq, endpoints, t);
                 ackFifoMessageLater(messageView, 1 + attempt, future0);
             }
         }, MoreExecutors.directExecutor());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 4218424..8879389 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -23,6 +23,7 @@ import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -122,7 +123,7 @@ public class PushConsumerSettings extends ClientSettings {
             default:
                 throw new IllegalArgumentException("Unrecognized backoff policy strategy.");
         }
-        this.arrivedFuture.set(null);
+        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index d357cdf..06a1d7f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -22,6 +22,7 @@ import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -92,7 +93,7 @@ public class SimpleConsumerSettings extends ClientSettings {
                 + "client type={}", clientId, pubSubCase, clientType);
             return;
         }
-        this.arrivedFuture.set(null);
+        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 60a8100..9f6f261 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl.producer;
 import apache.rocketmq.v2.Publishing;
 import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.Set;
@@ -94,7 +95,7 @@ public class ProducerSettings extends ClientSettings {
         final Publishing publishing = settings.getPublishing();
         this.compressBodyThresholdBytes = publishing.getCompressBodyThreshold();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
-        this.arrivedFuture.set(null);
+        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index 3519385..e7670ed 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,26 +17,10 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doNothing;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.message.MessageId;
-import org.apache.rocketmq.client.apis.producer.TransactionResolution;
-import org.apache.rocketmq.client.java.message.MessageCommon;
-import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
-import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -45,42 +29,6 @@ public class TransactionImplTest extends TestBase {
     @Mock
     ProducerImpl producer;
 
-    @Test
-    public void testTryAddMessage() throws IOException, NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        transaction.tryAddMessage(message0);
-        // Expect no exception thrown.
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testTryAddMultipleMessages() throws IOException, NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-        final Message message1 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        transaction.tryAddMessage(message0);
-        transaction.tryAddMessage(message1);
-        // Expect no exception thrown.
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testCommitWithNoReceipts() throws ClientException {
         final TransactionImpl transaction = new TransactionImpl(producer);
@@ -92,54 +40,4 @@ public class TransactionImplTest extends TestBase {
         final TransactionImpl transaction = new TransactionImpl(producer);
         transaction.rollback();
     }
-
-    @Test
-    public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
-        final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
-        transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
-            ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
-            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
-        transaction.commit();
-        assertEquals(TransactionResolution.COMMIT, resolutionArgumentCaptor.getValue());
-    }
-
-    @Test
-    public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
-        final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
-        transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
-            ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
-            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
-        transaction.rollback();
-        assertEquals(TransactionResolution.ROLLBACK, resolutionArgumentCaptor.getValue());
-    }
 }
\ No newline at end of file
diff --git a/java/style/spotbugs-suppressions.xml b/java/style/spotbugs-suppressions.xml
index ca8620f..eb0bb8d 100644
--- a/java/style/spotbugs-suppressions.xml
+++ b/java/style/spotbugs-suppressions.xml
@@ -6,7 +6,46 @@
     </Match>
 
     <Match>
-        <Package name="~org\.apache\.rocketmq\.client.*"/>
+        <Package name="~org\.apache\.rocketmq\.client\.java.*"/>
         <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
     </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.ClientImpl$2" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl$3" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl$4" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.producer.ProducerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
 </FindBugsFilter>