You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/01 20:46:08 UTC
[incubator-pulsar] branch master updated: Remove toString()
invocation in dispatcher path (#1164)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5257db0 Remove toString() invocation in dispatcher path (#1164)
5257db0 is described below
commit 5257db083ad2a8a712424d07cb503cea64db533f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 1 12:46:06 2018 -0800
Remove toString() invocation in dispatcher path (#1164)
---
.../java/org/apache/pulsar/broker/service/Consumer.java | 4 ++--
.../NonPersistentDispatcherMultipleConsumers.java | 16 ++++++++++------
.../NonPersistentDispatcherSingleActiveConsumer.java | 15 ++++++---------
.../service/nonpersistent/NonPersistentSubscription.java | 8 ++++----
4 files changed, 22 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 64bd424..decdc31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -226,7 +226,7 @@ public class Consumer {
}
}
- public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) {
+ public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) {
try {
// save the reader index and restore after parsing
metadataAndPayload.markReaderIndex();
@@ -253,7 +253,7 @@ public class Consumer {
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
- int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription.toString(), consumerId);
+ int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription, consumerId);
if (batchSize == -1) {
// this would suggest that the message might have been corrupted
iter.remove();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2b2902b..5fc7676 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
+import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
+
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -27,7 +29,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
-import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
@@ -38,18 +40,20 @@ import org.slf4j.LoggerFactory;
public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements NonPersistentDispatcher {
- private final NonPersistentTopic topic;
private CompletableFuture<Void> closeFuture = null;
private final String name;
private final Rate msgDrop;
protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
+ @SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;
- public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, String dispatcherName) {
- this.name = topic.getName() + " / " + dispatcherName;
- this.topic = topic;
+ private final Subscription subscription;
+
+ public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
+ this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
+ this.subscription = subscription;
}
@Override
@@ -148,7 +152,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getTotalSentMessages());
} else {
entries.forEach(entry -> {
- int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
+ int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
if (totalMsgs > 0) {
msgDrop.recordEvent();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index f9ce95d..107c6d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -26,19 +26,18 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
private final Rate msgDrop;
- private final String name;
+ private final Subscription subscription;
public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
- NonPersistentTopic topic, String subName) {
+ NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName());
- this.name = topic.getName() + " / " + subName;
+ this.subscription = subscription;
this.msgDrop = new Rate();
}
@@ -49,7 +48,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
currentConsumer.sendMessages(entries);
} else {
entries.forEach(entry -> {
- int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
+ int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
if (totalMsgs > 0) {
msgDrop.recordEvent();
}
@@ -57,7 +56,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
});
}
}
-
+
@Override
public Rate getMesssageDropRate() {
return msgDrop;
@@ -88,6 +87,4 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
// No-op
}
- private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherSingleActiveConsumer.class);
-
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index e2c0745..be5374d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +55,7 @@ public class NonPersistentSubscription implements Subscription {
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<NonPersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NonPersistentSubscription.class, "isFenced");
+ @SuppressWarnings("unused")
private volatile int isFenced = FALSE;
public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
@@ -86,12 +86,12 @@ public class NonPersistentSubscription implements Subscription {
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
- dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this.subName);
+ dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
- dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this.subName);
+ dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this);
}
break;
case Failover:
@@ -103,7 +103,7 @@ public class NonPersistentSubscription implements Subscription {
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex,
- topic, this.subName);
+ topic, this);
}
break;
default:
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.