You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/01 20:46:08 UTC

[incubator-pulsar] branch master updated: Remove toString() invocation in dispatcher path (#1164)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5257db0  Remove toString() invocation in dispatcher path (#1164)
5257db0 is described below

commit 5257db083ad2a8a712424d07cb503cea64db533f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 1 12:46:06 2018 -0800

    Remove toString() invocation in dispatcher path (#1164)
---
 .../java/org/apache/pulsar/broker/service/Consumer.java  |  4 ++--
 .../NonPersistentDispatcherMultipleConsumers.java        | 16 ++++++++++------
 .../NonPersistentDispatcherSingleActiveConsumer.java     | 15 ++++++---------
 .../service/nonpersistent/NonPersistentSubscription.java |  8 ++++----
 4 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 64bd424..decdc31 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -226,7 +226,7 @@ public class Consumer {
         }
     }
 
-    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) {
+    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) {
         try {
             // save the reader index and restore after parsing
             metadataAndPayload.markReaderIndex();
@@ -253,7 +253,7 @@ public class Consumer {
         while (iter.hasNext()) {
             Entry entry = iter.next();
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription.toString(), consumerId);
+            int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription, consumerId);
             if (batchSize == -1) {
                 // this would suggest that the message might have been corrupted
                 iter.remove();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2b2902b..5fc7676 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
+
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -27,7 +29,7 @@ import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
-import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -38,18 +40,20 @@ import org.slf4j.LoggerFactory;
 public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
         implements NonPersistentDispatcher {
 
-    private final NonPersistentTopic topic;
     private CompletableFuture<Void> closeFuture = null;
     private final String name;
     private final Rate msgDrop;
     protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
+    @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, String dispatcherName) {
-        this.name = topic.getName() + " / " + dispatcherName;
-        this.topic = topic;
+    private final Subscription subscription;
+
+    public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
+        this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
+        this.subscription = subscription;
     }
 
     @Override
@@ -148,7 +152,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getTotalSentMessages());
         } else {
             entries.forEach(entry -> {
-                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
+                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
                 if (totalMsgs > 0) {
                     msgDrop.recordEvent();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index f9ce95d..107c6d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -26,19 +26,18 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
     private final Rate msgDrop;
-    private final String name;
+    private final Subscription subscription;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
-            NonPersistentTopic topic, String subName) {
+            NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName());
-        this.name = topic.getName() + " / " + subName;
+        this.subscription = subscription;
         this.msgDrop = new Rate();
     }
 
@@ -49,7 +48,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
             currentConsumer.sendMessages(entries);
         } else {
             entries.forEach(entry -> {
-                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
+                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
                 if (totalMsgs > 0) {
                     msgDrop.recordEvent();
                 }
@@ -57,7 +56,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
             });
         }
     }
-    
+
     @Override
     public Rate getMesssageDropRate() {
         return msgDrop;
@@ -88,6 +87,4 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         // No-op
     }
 
-    private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherSingleActiveConsumer.class);
-
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index e2c0745..be5374d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +55,7 @@ public class NonPersistentSubscription implements Subscription {
     private static final int TRUE = 1;
     private static final AtomicIntegerFieldUpdater<NonPersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(NonPersistentSubscription.class, "isFenced");
+    @SuppressWarnings("unused")
     private volatile int isFenced = FALSE;
 
     public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
@@ -86,12 +86,12 @@ public class NonPersistentSubscription implements Subscription {
             switch (consumer.subType()) {
             case Exclusive:
                 if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
-                    dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this.subName);
+                    dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this);
                 }
                 break;
             case Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
-                    dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this.subName);
+                    dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this);
                 }
                 break;
             case Failover:
@@ -103,7 +103,7 @@ public class NonPersistentSubscription implements Subscription {
 
                 if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
                     dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex,
-                            topic, this.subName);
+                            topic, this);
                 }
                 break;
             default:

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.