You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:44 UTC

[13/23] storm git commit: Bring back pending message metric

Bring back pending message metric


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/084c5a0b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/084c5a0b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/084c5a0b

Branch: refs/heads/0.9.x-branch
Commit: 084c5a0b485620aa746d107419d403bf601b23d0
Parents: ee4e94a
Author: Enno Shioji <es...@gmail.com>
Authored: Mon Jun 1 16:07:03 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Mon Jun 1 16:07:03 2015 +0100

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/messaging/netty/Client.java    | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/084c5a0b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index ac36035..2c7f3db 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -104,6 +104,12 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private final AtomicInteger messagesLost = new AtomicInteger(0);
 
     /**
+     * Number of messages buffered in memory.
+     */
+    private final AtomicLong pendingMessages = new AtomicLong(0);
+
+
+    /**
      * This flag is set to true if and only if a client instance is being closed.
      */
     private volatile boolean closing = false;
@@ -295,11 +301,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             return;
         }
 
+
         final int numMessages = batch.size();
         LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
+        pendingMessages.addAndGet(numMessages);
         ChannelFuture future = channel.write(batch);
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future) throws Exception {
+                pendingMessages.addAndGet(0 - numMessages);
                 if (future.isSuccess()) {
                     LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
                     messagesSent.getAndAdd(batch.size());
@@ -365,6 +374,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         HashMap<String, Object> ret = new HashMap<String, Object>();
         ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
         ret.put("sent", messagesSent.getAndSet(0));
+        ret.put("pending", pendingMessages.get());
         ret.put("lostOnSend", messagesLost.getAndSet(0));
         ret.put("dest", dstAddress.toString());
         String src = srcAddressName();