You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/06/29 03:39:47 UTC
[pulsar] branch master updated: simplify code (#7381)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4a8ceb5 simplify code (#7381)
4a8ceb5 is described below
commit 4a8ceb52ceb1e711819e2a3d84b9d0c8667ce074
Author: Aloys <lo...@gmail.com>
AuthorDate: Mon Jun 29 11:39:35 2020 +0800
simplify code (#7381)
Simplify some codes in if statement and return statement
---
.../org/apache/pulsar/client/impl/ConsumerBase.java | 6 +++---
.../pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++++----
.../pulsar/client/impl/MultiTopicsConsumerImpl.java | 20 ++++----------------
3 files changed, 11 insertions(+), 23 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 3d45931..cdc3f5f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -228,7 +228,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
- if (conf.isRetryEnable() == false) {
+ if (!conf.isRetryEnable()) {
throw new PulsarClientException("reconsumeLater method not support!");
}
try {
@@ -300,7 +300,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
- if (conf.isRetryEnable() == false) {
+ if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
}
try {
@@ -331,7 +331,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
- if (conf.isRetryEnable() == false) {
+ if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
}
if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 9240ab3..2134e1c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -118,10 +118,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return FutureUtil.failedFuture(
new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
}
- if(conf.isRetryEnable() == true && conf.getTopicNames().size() > 0 ) {
- TopicName topicFisrt = TopicName.get(conf.getTopicNames().iterator().next());
- String retryLetterTopic = topicFisrt.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
- String deadLetterTopic = topicFisrt.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
+ TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
+ String retryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+ String deadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
if(conf.getDeadLetterPolicy() == null) {
conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e966186..2a94717 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -881,12 +881,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
consumers.values().stream()
.filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
- if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
- TopicName.get(topicName).getPartitionedTopicName())) {
- return true;
- } else {
- return false;
- }
+ return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+ TopicName.get(topicName).getPartitionedTopicName());
})
.collect(Collectors.toList()));
@@ -953,11 +949,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
List<ConsumerImpl<T>> consumersToUnsub = consumers.values().stream()
.filter(consumer -> {
String consumerTopicName = consumer.getTopic();
- if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
- return true;
- } else {
- return false;
- }
+ return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName);
}).collect(Collectors.toList());
List<CompletableFuture<Void>> futureList = consumersToUnsub.stream()
@@ -1005,11 +997,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
List<ConsumerImpl<T>> consumersToClose = consumers.values().stream()
.filter(consumer -> {
String consumerTopicName = consumer.getTopic();
- if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
- return true;
- } else {
- return false;
- }
+ return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName);
}).collect(Collectors.toList());
List<CompletableFuture<Void>> futureList = consumersToClose.stream()