You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/01 20:46:10 UTC

[GitHub] merlimat closed pull request #1164: Remove toString() invocation in dispatcher path

merlimat closed pull request #1164: Remove toString() invocation in dispatcher path
URL: https://github.com/apache/incubator-pulsar/pull/1164
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 64bd4243c..decdc3190 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -226,7 +226,7 @@ private void incrementUnackedMessages(int ackedMessages) {
         }
     }
 
-    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, String subscription, long consumerId) {
+    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) {
         try {
             // save the reader index and restore after parsing
             metadataAndPayload.markReaderIndex();
@@ -253,7 +253,7 @@ void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sent
         while (iter.hasNext()) {
             Entry entry = iter.next();
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription.toString(), consumerId);
+            int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription, consumerId);
             if (batchSize == -1) {
                 // this would suggest that the message might have been corrupted
                 iter.remove();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2b2902b69..5fc7676cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
+
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -27,7 +29,7 @@
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
-import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -38,18 +40,20 @@
 public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
         implements NonPersistentDispatcher {
 
-    private final NonPersistentTopic topic;
     private CompletableFuture<Void> closeFuture = null;
     private final String name;
     private final Rate msgDrop;
     protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
+    @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, String dispatcherName) {
-        this.name = topic.getName() + " / " + dispatcherName;
-        this.topic = topic;
+    private final Subscription subscription;
+
+    public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
+        this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
+        this.subscription = subscription;
     }
 
     @Override
@@ -148,7 +152,7 @@ public void sendMessages(List<Entry> entries) {
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getTotalSentMessages());
         } else {
             entries.forEach(entry -> {
-                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
+                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
                 if (totalMsgs > 0) {
                     msgDrop.recordEvent();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index f9ce95d92..107c6d6cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -26,19 +26,18 @@
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
     private final Rate msgDrop;
-    private final String name;
+    private final Subscription subscription;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
-            NonPersistentTopic topic, String subName) {
+            NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName());
-        this.name = topic.getName() + " / " + subName;
+        this.subscription = subscription;
         this.msgDrop = new Rate();
     }
 
@@ -49,7 +48,7 @@ public void sendMessages(List<Entry> entries) {
             currentConsumer.sendMessages(entries);
         } else {
             entries.forEach(entry -> {
-                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), name, -1);
+                int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
                 if (totalMsgs > 0) {
                     msgDrop.recordEvent();
                 }
@@ -57,7 +56,7 @@ public void sendMessages(List<Entry> entries) {
             });
         }
     }
-    
+
     @Override
     public Rate getMesssageDropRate() {
         return msgDrop;
@@ -88,6 +87,4 @@ protected void cancelPendingRead() {
         // No-op
     }
 
-    private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherSingleActiveConsumer.class);
-
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index e2c074565..be5374d9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -39,7 +39,6 @@
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +55,7 @@
     private static final int TRUE = 1;
     private static final AtomicIntegerFieldUpdater<NonPersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(NonPersistentSubscription.class, "isFenced");
+    @SuppressWarnings("unused")
     private volatile int isFenced = FALSE;
 
     public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
@@ -86,12 +86,12 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
             switch (consumer.subType()) {
             case Exclusive:
                 if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
-                    dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this.subName);
+                    dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Exclusive, 0, topic, this);
                 }
                 break;
             case Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
-                    dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this.subName);
+                    dispatcher = new NonPersistentDispatcherMultipleConsumers(topic, this);
                 }
                 break;
             case Failover:
@@ -103,7 +103,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
 
                 if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
                     dispatcher = new NonPersistentDispatcherSingleActiveConsumer(SubType.Failover, partitionIndex,
-                            topic, this.subName);
+                            topic, this);
                 }
                 break;
             default:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services