You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/03 09:09:09 UTC

[rocketmq-clients] branch java updated (bde1815 -> fd7eb2c)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a change to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


 discard bde1815  Java: polish code
     new fd7eb2c  Java: fix compile issue on JDK17

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (bde1815)
            \
             N -- N -- N   refs/heads/java (fd7eb2c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[rocketmq-clients] 01/01: Java: fix compile issue on JDK17

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit fd7eb2cb713ab4d265c9ff907105edd186f7f586
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Sun Jul 3 17:02:57 2022 +0800

    Java: fix compile issue on JDK17
---
 .github/workflows/java_build.yml                   |   2 +-
 .../rocketmq/client/java/impl/ClientImpl.java      |   2 +-
 .../client/java/impl/consumer/ConsumerImpl.java    |  12 +--
 .../java/impl/consumer/ProcessQueueImpl.java       |  81 ++++++++++++----
 .../java/impl/consumer/PushConsumerSettings.java   |   3 +-
 .../java/impl/consumer/SimpleConsumerSettings.java |   3 +-
 .../java/impl/producer/ProducerSettings.java       |   3 +-
 .../java/impl/producer/TransactionImplTest.java    | 102 ---------------------
 java/style/spotbugs-suppressions.xml               |  41 ++++++++-
 9 files changed, 112 insertions(+), 137 deletions(-)

diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index 8bf1d0e..eda1e09 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -2,7 +2,7 @@ name: Java Build
 on: [push, pull_request]
 jobs:
   java_build:
-    name: "check (${{ matrix.os }} JDK-${{ matrix.jdk }})"
+    name: "${{ matrix.os }} JDK-${{ matrix.jdk }}"
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index b16221f..21aa92e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -435,7 +435,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
                         old, topicRouteDataResult);
                 }
-                future0.set(null);
+                future0.setFuture(Futures.immediateVoidFuture());
                 onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
             }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 18cee43..539b876 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -146,12 +146,8 @@ abstract class ConsumerImpl extends ClientImpl {
             final Metadata metadata = sign();
             future = clientManager.ackMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<AckMessageResponse> future0 = SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            return Futures.immediateFailedFuture(t);
         }
-        final String topic = messageView.getTopic();
-        final MessageId messageId = messageView.getMessageId();
         Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
             @Override
             public void onSuccess(AckMessageResponse response) {
@@ -160,10 +156,6 @@ abstract class ConsumerImpl extends ClientImpl {
                 final Duration duration = stopwatch.elapsed();
                 MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
-                if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}, " +
-                        "clientId={}", code, status.getMessage(), topic, messageId, clientId);
-                }
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, messageHookPointsStatus);
             }
 
@@ -171,8 +163,6 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onFailure(Throwable t) {
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
-                LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}",
-                    topic, messageId, clientId, t);
             }
         }, MoreExecutors.directExecutor());
         return future;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index dc648f6..35c3b50 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -376,12 +376,42 @@ class ProcessQueueImpl implements ProcessQueue {
         statsConsumptionResult(consumeResult);
         eraseMessage(messageView);
         if (ConsumeResult.SUCCESS.equals(consumeResult)) {
-            consumer.ackMessage(messageView);
+            ackMessage(messageView);
             return;
         }
         nackMessage(messageView);
     }
 
+    private void ackMessage(MessageViewImpl messageView) {
+        final String clientId = consumer.getClientId();
+        final String consumerGroup = consumer.getConsumerGroup();
+        final MessageId messageId = messageView.getMessageId();
+        final Endpoints endpoints = messageView.getEndpoints();
+        final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+            @Override
+            public void onSuccess(AckMessageResponse response) {
+                final Status status = response.getStatus();
+                final Code code = status.getCode();
+                if (Code.OK.equals(code)) {
+                    LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                        + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+                    return;
+                }
+                LOGGER.error("Failed to ack message, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                        + "endpoints={}, code={}, status message={}", clientId, consumerGroup, messageId, mq,
+                    endpoints, code, status.getMessage());
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOGGER.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, "
+                        + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq,
+                    endpoints, t);
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
     private void nackMessage(MessageViewImpl messageView) {
         final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(messageView.getDeliveryAttempt());
         consumer.changeInvisibleDuration(messageView, duration);
@@ -452,6 +482,9 @@ class ProcessQueueImpl implements ProcessQueue {
         final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
         final String clientId = consumer.getClientId();
+        final String consumerGroup = consumer.getConsumerGroup();
+        final MessageId messageId = messageView.getMessageId();
+        final Endpoints endpoints = messageView.getEndpoints();
         Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
             @Override
             public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
@@ -460,26 +493,31 @@ class ProcessQueueImpl implements ProcessQueue {
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
-                            " clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
-                        clientId, messageView.getMessageId(), attempt, mq, code, status.getMessage());
+                            " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, code={}, "
+                            + "status message={}", clientId, consumerGroup, messageId, attempt, mq, endpoints, code,
+                        status.getMessage());
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, attempt={}, " +
-                        "messageId={}, mq={}", clientId, attempt, messageView.getMessageId(), mq);
+                    LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+                            + "attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
+                        messageId, mq, endpoints);
+                } else {
+                    LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+                        + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
                 }
                 // Set result if message is forwarded successfully.
-                future0.set(null);
+                future0.setFuture(Futures.immediateVoidFuture());
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
                 LOGGER.error("Exception raised while forward message to DLQ, would attempt to re-forward later, " +
-                        "clientId={}, attempt={}, messageId={}, mq={}", clientId, attempt,
-                    messageView.getMessageId(), mq, t);
+                        "clientId={}, consumerGroup={}, attempt={}, messageId={}, mq={}", clientId, consumerGroup,
+                    attempt, messageId, mq, t);
                 forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0);
             }
         }, MoreExecutors.directExecutor());
@@ -518,8 +556,10 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void ackFifoMessage(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
-        final Endpoints endpoints = messageView.getEndpoints();
         final String clientId = consumer.getClientId();
+        final String consumerGroup = consumer.getConsumerGroup();
+        final MessageId messageId = messageView.getMessageId();
+        final Endpoints endpoints = messageView.getEndpoints();
         final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView);
         Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
             @Override
@@ -528,27 +568,32 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}," +
-                            " messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
-                        clientId, attempt, messageView.getMessageId(), mq, code, endpoints, status.getMessage());
+                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
+                            + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status "
+                            + "message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code,
+                        endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}," +
-                        " endpoints={}", clientId, attempt, messageView.getMessageId(), mq, endpoints);
+                    LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, "
+                            + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
+                        messageId, mq, endpoints);
+                } else {
+                    LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                        + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
                 }
                 // Set result if FIFO message is acknowledged successfully.
-                future0.set(null);
+                future0.setFuture(Futures.immediateVoidFuture());
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
-                LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later," +
-                        " attempt={}, messageId={}, mq={}, endpoints={}",
-                    clientId, attempt, messageView.getMessageId(), mq, endpoints, t);
+                LOGGER.error("Exception raised while acknowledging fifo message, clientId={}, consumerGroup={}, "
+                        + "would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}", clientId,
+                    consumerGroup, attempt, messageId, mq, endpoints, t);
                 ackFifoMessageLater(messageView, 1 + attempt, future0);
             }
         }, MoreExecutors.directExecutor());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 4218424..8879389 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -23,6 +23,7 @@ import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -122,7 +123,7 @@ public class PushConsumerSettings extends ClientSettings {
             default:
                 throw new IllegalArgumentException("Unrecognized backoff policy strategy.");
         }
-        this.arrivedFuture.set(null);
+        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index d357cdf..06a1d7f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -22,6 +22,7 @@ import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -92,7 +93,7 @@ public class SimpleConsumerSettings extends ClientSettings {
                 + "client type={}", clientId, pubSubCase, clientType);
             return;
         }
-        this.arrivedFuture.set(null);
+        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 60a8100..9f6f261 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl.producer;
 import apache.rocketmq.v2.Publishing;
 import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.Set;
@@ -94,7 +95,7 @@ public class ProducerSettings extends ClientSettings {
         final Publishing publishing = settings.getPublishing();
         this.compressBodyThresholdBytes = publishing.getCompressBodyThreshold();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
-        this.arrivedFuture.set(null);
+        this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
 
     @Override
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index 3519385..e7670ed 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,26 +17,10 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doNothing;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.message.MessageId;
-import org.apache.rocketmq.client.apis.producer.TransactionResolution;
-import org.apache.rocketmq.client.java.message.MessageCommon;
-import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
-import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -45,42 +29,6 @@ public class TransactionImplTest extends TestBase {
     @Mock
     ProducerImpl producer;
 
-    @Test
-    public void testTryAddMessage() throws IOException, NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        transaction.tryAddMessage(message0);
-        // Expect no exception thrown.
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testTryAddMultipleMessages() throws IOException, NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-        final Message message1 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        transaction.tryAddMessage(message0);
-        transaction.tryAddMessage(message1);
-        // Expect no exception thrown.
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testCommitWithNoReceipts() throws ClientException {
         final TransactionImpl transaction = new TransactionImpl(producer);
@@ -92,54 +40,4 @@ public class TransactionImplTest extends TestBase {
         final TransactionImpl transaction = new TransactionImpl(producer);
         transaction.rollback();
     }
-
-    @Test
-    public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
-        final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
-        transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
-            ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
-            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
-        transaction.commit();
-        assertEquals(TransactionResolution.COMMIT, resolutionArgumentCaptor.getValue());
-    }
-
-    @Test
-    public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException {
-        final TransactionImpl transaction = new TransactionImpl(producer);
-        final Message message0 = fakeMessage(FAKE_TOPIC_0);
-
-        final Class<? extends ProducerImpl> clazz = producer.getClass();
-        final Field field = clazz.getDeclaredField("producerSettings");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(producer, fakeProducerSettings());
-
-        final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
-        final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
-        transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
-            ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
-            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
-        transaction.rollback();
-        assertEquals(TransactionResolution.ROLLBACK, resolutionArgumentCaptor.getValue());
-    }
 }
\ No newline at end of file
diff --git a/java/style/spotbugs-suppressions.xml b/java/style/spotbugs-suppressions.xml
index ca8620f..eb0bb8d 100644
--- a/java/style/spotbugs-suppressions.xml
+++ b/java/style/spotbugs-suppressions.xml
@@ -6,7 +6,46 @@
     </Match>
 
     <Match>
-        <Package name="~org\.apache\.rocketmq\.client.*"/>
+        <Package name="~org\.apache\.rocketmq\.client\.java.*"/>
         <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
     </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.ClientImpl$2" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl$3" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.ProcessQueueImpl$4" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.PushConsumerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.rocketmq.client.java.impl.producer.ProducerSettings" />
+        <Method name="applySettingsCommand" />
+        <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
+    </Match>
 </FindBugsFilter>