You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/04 07:07:26 UTC

[rocketmq-clients] branch master updated: Java: fix opposite judgement for delay consumption (#25)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b8e5b5  Java: fix opposite judgement for delay consumption (#25)
9b8e5b5 is described below

commit 9b8e5b58e20ff468ef82626df5bc22792349c849
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 4 15:07:22 2022 +0800

    Java: fix opposite judgement for delay consumption (#25)
---
 .asf.yaml                                                   |  8 ++++----
 .github/workflows/cpp_build.yml                             |  5 +++--
 .github/workflows/csharp_build.yml                          |  4 ++--
 .github/workflows/java_build.yml                            |  2 +-
 .../rocketmq/client/java/impl/consumer/ConsumeService.java  |  2 +-
 .../client/java/impl/consumer/ProcessQueueImpl.java         | 13 ++++++-------
 .../client/java/impl/consumer/PushConsumerSettings.java     |  4 ++--
 .../client/java/impl/consumer/SimpleConsumerSettings.java   |  4 ++--
 8 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 5860c4f..392b656 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -30,10 +30,10 @@ github:
     projects: true
   enabled_merge_buttons:
     # Enable squash button
-    squash:  true
+    squash: true
     # Disable merge button
-    merge:   false
+    merge: false
     # Enable rebase button
-    rebase:  true
+    rebase: true
   protected_branches:
-    master: {}
\ No newline at end of file
+    master: {}
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index a09c782..47c6cd6 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -1,11 +1,12 @@
 name: CPP Build
 on: [push, pull_request]
 jobs:
-  cpp:
+  cpp_build:
+    name: "CPP (${{ matrix.os }})"
+    runs-on: ${{ matrix.os }}
     strategy:
       matrix:
         os: [ubuntu-18.04, ubuntu-20.04, ubuntu-22.04, macos-12,  macos-11, windows-2019]
-    runs-on: ${{ matrix.os }}
     steps:
       - uses: actions/checkout@v2
       - name: Compile All Targets
diff --git a/.github/workflows/csharp_build.yml b/.github/workflows/csharp_build.yml
index 557c901..241ef69 100644
--- a/.github/workflows/csharp_build.yml
+++ b/.github/workflows/csharp_build.yml
@@ -1,8 +1,8 @@
 name: C# Build
 on: [push, pull_request]
 jobs:
-  c_sharp_ubuntu_18_04:
-    name: Ubuntu 18.04
+  c_sharp:
+    name: "C# (ubuntu-18.04)"
     runs-on: ubuntu-18.04
     steps:
       - name: Checkout 
diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index eda1e09..4901d0a 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -2,7 +2,7 @@ name: Java Build
 on: [push, pull_request]
 jobs:
   java_build:
-    name: "${{ matrix.os }} JDK-${{ matrix.jdk }}"
+    name: "Java (${{ matrix.os }} JDK-${{ matrix.jdk }})"
     runs-on: ${{ matrix.os }}
     strategy:
       matrix:
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index c4a2ebc..9115b00 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -69,7 +69,7 @@ public abstract class ConsumeService extends Dispatcher {
         final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor);
         final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor);
         // Consume message with no delay.
-        if (Duration.ZERO.compareTo(delay) <= 0) {
+        if (Duration.ZERO.compareTo(delay) >= 0) {
             return executorService.submit(task);
         }
         final SettableFuture<ConsumeResult> future0 = SettableFuture.create();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 35c3b50..f36012f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -126,7 +126,7 @@ class ProcessQueueImpl implements ProcessQueue {
         if (idleDuration.compareTo(maxIdleDuration) < 0) {
             return false;
         }
-        LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration,
+        LOGGER.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, mq={}, clientId={}", idleDuration,
             maxIdleDuration, mq, consumer.getClientId());
         return true;
     }
@@ -155,8 +155,8 @@ class ProcessQueueImpl implements ProcessQueue {
             corrupted.forEach(messageView -> {
                 final MessageId messageId = messageView.getMessageId();
                 if (consumer.getPushConsumerSettings().isFifo()) {
-                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}," +
-                        " messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, " +
+                        "messageId={}, clientId={}", mq, messageId, consumer.getClientId());
                     forwardToDeadLetterQueue(messageView);
                     return;
                 }
@@ -448,12 +448,13 @@ class ProcessQueueImpl implements ProcessQueue {
         int attempt = messageView.getDeliveryAttempt();
         final MessageId messageId = messageView.getMessageId();
         final ConsumeService service = consumer.getConsumeService();
+        final String clientId = consumer.getClientId();
         if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
             final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
             LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," +
                     " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
-                maxAttempts, attempt, mq, messageId, nextAttemptDelay, consumer.getClientId());
+                maxAttempts, attempt, mq, messageId, nextAttemptDelay, clientId);
             final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay);
             return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result),
                 MoreExecutors.directExecutor());
@@ -461,9 +462,7 @@ class ProcessQueueImpl implements ProcessQueue {
         boolean ok = ConsumeResult.SUCCESS.equals(consumeResult);
         if (!ok) {
             LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, "
-                    + "attempt={}," +
-                    " mq={}, messageId={}, clientId={}",
-                maxAttempts, attempt, mq, messageId, consumer.getClientId());
+                + "attempt={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, mq, messageId, clientId);
         }
         // Ack message or forward it to DLQ depends on consumption result.
         ListenableFuture<Void> future = ok ? ackFifoMessage(messageView) : forwardToDeadLetterQueue(messageView);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 8879389..05bfb2f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -104,8 +104,8 @@ public class PushConsumerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
-                + "client type={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+                + "clientType={}", clientId, pubSubCase, clientType);
             return;
         }
         final Subscription subscription = settings.getSubscription();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index 06a1d7f..93816c8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -89,8 +89,8 @@ public class SimpleConsumerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
-                + "client type={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+                + "clientType={}", clientId, pubSubCase, clientType);
             return;
         }
         this.arrivedFuture.setFuture(Futures.immediateVoidFuture());