You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/04 07:07:26 UTC
[rocketmq-clients] branch master updated: Java: fix opposite judgement for delay consumption (#25)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 9b8e5b5 Java: fix opposite judgement for delay consumption (#25)
9b8e5b5 is described below
commit 9b8e5b58e20ff468ef82626df5bc22792349c849
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 4 15:07:22 2022 +0800
Java: fix opposite judgement for delay consumption (#25)
---
.asf.yaml | 8 ++++----
.github/workflows/cpp_build.yml | 5 +++--
.github/workflows/csharp_build.yml | 4 ++--
.github/workflows/java_build.yml | 2 +-
.../rocketmq/client/java/impl/consumer/ConsumeService.java | 2 +-
.../client/java/impl/consumer/ProcessQueueImpl.java | 13 ++++++-------
.../client/java/impl/consumer/PushConsumerSettings.java | 4 ++--
.../client/java/impl/consumer/SimpleConsumerSettings.java | 4 ++--
8 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index 5860c4f..392b656 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -30,10 +30,10 @@ github:
projects: true
enabled_merge_buttons:
# Enable squash button
- squash: true
+ squash: true
# Disable merge button
- merge: false
+ merge: false
# Enable rebase button
- rebase: true
+ rebase: true
protected_branches:
- master: {}
\ No newline at end of file
+ master: {}
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index a09c782..47c6cd6 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -1,11 +1,12 @@
name: CPP Build
on: [push, pull_request]
jobs:
- cpp:
+ cpp_build:
+ name: "CPP (${{ matrix.os }})"
+ runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-18.04, ubuntu-20.04, ubuntu-22.04, macos-12, macos-11, windows-2019]
- runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Compile All Targets
diff --git a/.github/workflows/csharp_build.yml b/.github/workflows/csharp_build.yml
index 557c901..241ef69 100644
--- a/.github/workflows/csharp_build.yml
+++ b/.github/workflows/csharp_build.yml
@@ -1,8 +1,8 @@
name: C# Build
on: [push, pull_request]
jobs:
- c_sharp_ubuntu_18_04:
- name: Ubuntu 18.04
+ c_sharp:
+ name: "C# (ubuntu-18.04)"
runs-on: ubuntu-18.04
steps:
- name: Checkout
diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index eda1e09..4901d0a 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -2,7 +2,7 @@ name: Java Build
on: [push, pull_request]
jobs:
java_build:
- name: "${{ matrix.os }} JDK-${{ matrix.jdk }}"
+ name: "Java (${{ matrix.os }} JDK-${{ matrix.jdk }})"
runs-on: ${{ matrix.os }}
strategy:
matrix:
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index c4a2ebc..9115b00 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -69,7 +69,7 @@ public abstract class ConsumeService extends Dispatcher {
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor);
final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor);
// Consume message with no delay.
- if (Duration.ZERO.compareTo(delay) <= 0) {
+ if (Duration.ZERO.compareTo(delay) >= 0) {
return executorService.submit(task);
}
final SettableFuture<ConsumeResult> future0 = SettableFuture.create();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 35c3b50..f36012f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -126,7 +126,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (idleDuration.compareTo(maxIdleDuration) < 0) {
return false;
}
- LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration,
+ LOGGER.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, mq={}, clientId={}", idleDuration,
maxIdleDuration, mq, consumer.getClientId());
return true;
}
@@ -155,8 +155,8 @@ class ProcessQueueImpl implements ProcessQueue {
corrupted.forEach(messageView -> {
final MessageId messageId = messageView.getMessageId();
if (consumer.getPushConsumerSettings().isFifo()) {
- LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}," +
- " messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+ LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, " +
+ "messageId={}, clientId={}", mq, messageId, consumer.getClientId());
forwardToDeadLetterQueue(messageView);
return;
}
@@ -448,12 +448,13 @@ class ProcessQueueImpl implements ProcessQueue {
int attempt = messageView.getDeliveryAttempt();
final MessageId messageId = messageView.getMessageId();
final ConsumeService service = consumer.getConsumeService();
+ final String clientId = consumer.getClientId();
if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," +
" attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
- maxAttempts, attempt, mq, messageId, nextAttemptDelay, consumer.getClientId());
+ maxAttempts, attempt, mq, messageId, nextAttemptDelay, clientId);
final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay);
return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result),
MoreExecutors.directExecutor());
@@ -461,9 +462,7 @@ class ProcessQueueImpl implements ProcessQueue {
boolean ok = ConsumeResult.SUCCESS.equals(consumeResult);
if (!ok) {
LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, "
- + "attempt={}," +
- " mq={}, messageId={}, clientId={}",
- maxAttempts, attempt, mq, messageId, consumer.getClientId());
+ + "attempt={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, mq, messageId, clientId);
}
// Ack message or forward it to DLQ depends on consumption result.
ListenableFuture<Void> future = ok ? ackFifoMessage(messageView) : forwardToDeadLetterQueue(messageView);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 8879389..05bfb2f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -104,8 +104,8 @@ public class PushConsumerSettings extends ClientSettings {
public void applySettingsCommand(Settings settings) {
final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
- + "client type={}", clientId, pubSubCase, clientType);
+ LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+ + "clientType={}", clientId, pubSubCase, clientType);
return;
}
final Subscription subscription = settings.getSubscription();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index 06a1d7f..93816c8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -89,8 +89,8 @@ public class SimpleConsumerSettings extends ClientSettings {
public void applySettingsCommand(Settings settings) {
final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
- + "client type={}", clientId, pubSubCase, clientType);
+ LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+ + "clientType={}", clientId, pubSubCase, clientType);
return;
}
this.arrivedFuture.setFuture(Futures.immediateVoidFuture());