You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/01 08:59:03 UTC

[GitHub] [pulsar] ivankelly commented on a change in pull request #7406: Improved in max-pending-bytes mechanism for broker

ivankelly commented on a change in pull request #7406:
URL: https://github.com/apache/pulsar/pull/7406#discussion_r448195643



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -167,10 +171,26 @@
     private volatile boolean autoReadDisabledRateLimiting = false;
     private FeatureFlags features;
     // Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
-    private volatile boolean autoReadDisabledPublishBufferLimiting = false;
-    private static final AtomicLongFieldUpdater<ServerCnx> MSG_PUBLISH_BUFFER_SIZE_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ServerCnx.class, "messagePublishBufferSize");
-    private volatile long messagePublishBufferSize = 0;
+    private boolean autoReadDisabledPublishBufferLimiting = false;
+
+    private final long maxPendingBytesPerThread;
+    private final long resumeThresholdPendingBytesPerThread;
+
+    // Number of bytes pending to be published from a single specific IO thread.
+    private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>() {
+        @Override
+        protected MutableLong initialValue() throws Exception {
+            return new MutableLong();
+        }
+    };
+
+    // A set of connections tied to the current thread
+    private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>() {
+        @Override
+        protected Set<ServerCnx> initialValue() throws Exception {
+            return Collections.newSetFromMap(new IdentityHashMap<>());

Review comment:
       What's the advantage of this over "new HashSet<>()"?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2307,4 +2253,16 @@ private boolean isSystemTopic(String topic) {
     public void setInterceptor(BrokerInterceptor interceptor) {
         this.interceptor = interceptor;
     }
+
+    public void pausedConnections(int numberOfConnections) {
+        pausedConnections.addAndGet(numberOfConnections);

Review comment:
       Is this exported via prometheus? It would be better to have separate event counters for pause and resume so that if pausing happens between prometheus pulls we can see it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org