You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/02/18 00:11:00 UTC

[25/33] incubator-geode git commit: Added stat for udp message receiver thread.

Added stat for udp message receiver thread.

Description: The total amount of time spent deserializing and
dispatching UDP messages in the message-reader thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c2175bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c2175bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c2175bc6

Branch: refs/heads/feature/GEODE-831
Commit: c2175bc6626d6132ba2cf986fa7f0acbc462fd37
Parents: 7627585
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Feb 16 13:53:09 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Feb 16 13:56:22 2016 -0800

----------------------------------------------------------------------
 .../gemfire/distributed/internal/DMStats.java   |   2 +
 .../distributed/internal/DistributionStats.java |  10 ++
 .../internal/LonerDistributionManager.java      |   2 +
 .../gms/messenger/JGroupsMessenger.java         | 105 ++++++++++---------
 4 files changed, 70 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
index 7bf5b80..c241bc7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
@@ -171,6 +171,8 @@ public interface DMStats {
   public void incNumSerialThreads(int threads);
 
   public void incMessageChannelTime(long val);
+
+  public void incUDPDispachRequestTime(long val);
   
   public long getReplyMessageTime();
   public void incReplyMessageTime(long val);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
index 28ca380..634929e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
@@ -61,6 +61,7 @@ public class DistributionStats implements DMStats {
   private final static int processedMessagesTimeId;
   private final static int messageProcessingScheduleTimeId;
   private final static int messageChannelTimeId;
+  private final static int udpDispachRequestTimeId;
   private final static int replyMessageTimeId;
   private final static int distributeMessageTimeId;
   private final static int nodesId;
@@ -272,6 +273,7 @@ public class DistributionStats implements DMStats {
     final String functionExecutionThreadsDesc = "The number of threads currently processing function execution messages.";
     final String waitingThreadsDesc = "The number of threads currently processing messages that had to wait for a resource.";
     final String messageChannelTimeDesc = "The total amount of time received messages spent in the distribution channel";
+    final String udpDispachRequestTimeDesc = "The total amount of time spent deserializing and dispatching UDP messages in the message-reader thread.";
     final String replyMessageTimeDesc = "The amount of time spent processing reply messages. This includes both processedMessagesTime and messageProcessingScheduleTime.";
     final String distributeMessageTimeDesc = "The amount of time it takes to prepare a message and send it on the network.  This includes sentMessagesTime.";
     final String nodesDesc = "The current number of nodes in this distributed system.";
@@ -409,6 +411,7 @@ public class DistributionStats implements DMStats {
         f.createIntGauge("functionExecutionThreads", functionExecutionThreadsDesc, "threads"),
         f.createIntGauge("waitingThreads", waitingThreadsDesc, "threads"),
         f.createLongCounter("messageChannelTime", messageChannelTimeDesc, "nanoseconds", false),
+        f.createLongCounter("udpDispachRequestTime", udpDispachRequestTimeDesc, "nanoseconds", false),
         f.createLongCounter("replyMessageTime", replyMessageTimeDesc, "nanoseconds", false),
         f.createLongCounter("distributeMessageTime", distributeMessageTimeDesc, "nanoseconds", false),
         f.createIntGauge("nodes", nodesDesc, "nodes"),
@@ -572,6 +575,7 @@ public class DistributionStats implements DMStats {
     messageProcessingScheduleTimeId =
       type.nameToId("messageProcessingScheduleTime");
     messageChannelTimeId = type.nameToId("messageChannelTime");
+    udpDispachRequestTimeId = type.nameToId("udpDispachRequestTime");
     replyMessageTimeId = type.nameToId("replyMessageTime");
     distributeMessageTimeId = type.nameToId("distributeMessageTime");
     nodesId = type.nameToId("nodes");
@@ -1078,6 +1082,12 @@ public class DistributionStats implements DMStats {
       this.stats.incLong(messageChannelTimeId, delta);
     }
   }
+  
+  public void incUDPDispachRequestTime(long delta) {
+    if (enableClockStats) {
+      this.stats.incLong(udpDispachRequestTimeId, delta);
+    }
+  }
 
   public long getReplyMessageTime() {
     return this.stats.getLong(replyMessageTimeId);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
index 419c096..50c07de 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
@@ -363,6 +363,8 @@ public class LonerDistributionManager implements DM {
     @Override
     public void incMessageChannelTime(long val) {}
     @Override
+    public void incUDPDispachRequestTime(long val) {};
+    @Override
     public long getReplyMessageTime() {return 0;}
     @Override
     public void incReplyMessageTime(long val) {}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 3bd1e83..b9fcc38 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -70,6 +70,7 @@ import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
 import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
@@ -1024,58 +1025,64 @@ public class JGroupsMessenger implements Messenger {
   
     @Override
     public void receive(Message jgmsg) {
-      if (services.getManager().shutdownInProgress()) {
-        return;
-      }
-
-      if (logger.isTraceEnabled()) {
-        logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
-      }
-      
-      //Respond to ping messages sent from other systems that are in a auto reconnect state
-      byte[] contents = jgmsg.getBuffer();
-      if (contents == null) {
-        return;
-      }
-      if (pingPonger.isPingMessage(contents)) {
+      long startTime = DistributionStats.getStatTime();
+      try {
+        if (services.getManager().shutdownInProgress()) {
+          return;
+        }
+  
+        if (logger.isTraceEnabled()) {
+          logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
+        }
+        
+        //Respond to ping messages sent from other systems that are in a auto reconnect state
+        byte[] contents = jgmsg.getBuffer();
+        if (contents == null) {
+          return;
+        }
+        if (pingPonger.isPingMessage(contents)) {
+          try {
+            pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
+          }
+          catch (Exception e) {
+            logger.info("Failed sending Pong response to " + jgmsg.getSrc());
+          }
+          return;
+        } else if (pingPonger.isPongMessage(contents)) {
+          pongsReceived.incrementAndGet();
+          return;
+        }
+        
+        Object o = readJGMessage(jgmsg);
+        if (o == null) {
+          return;
+        }
+  
+        DistributionMessage msg = (DistributionMessage)o;
+        assert msg.getSender() != null;
+        
+        // admin-only VMs don't have caches, so we ignore cache operations
+        // multicast to them, avoiding deserialization cost and classpath
+        // problems
+        if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE)
+             && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
+          return;
+        }
+  
+        msg.resetTimestamp();
+        msg.setBytesRead(jgmsg.getLength());
+              
         try {
-          pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
+          logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
+          filterIncomingMessage(msg);
+          getMessageHandler(msg).processMessage(msg);
         }
-        catch (Exception e) {
-          logger.info("Failed sending Pong response to " + jgmsg.getSrc());
+        catch (MemberShunnedException e) {
+          // message from non-member - ignore
         }
-        return;
-      } else if (pingPonger.isPongMessage(contents)) {
-        pongsReceived.incrementAndGet();
-        return;
-      }
-      
-      Object o = readJGMessage(jgmsg);
-      if (o == null) {
-        return;
-      }
-
-      DistributionMessage msg = (DistributionMessage)o;
-      assert msg.getSender() != null;
-      
-      // admin-only VMs don't have caches, so we ignore cache operations
-      // multicast to them, avoiding deserialization cost and classpath
-      // problems
-      if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE)
-           && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
-        return;
-      }
-
-      msg.resetTimestamp();
-      msg.setBytesRead(jgmsg.getLength());
-            
-      try {
-        logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
-        filterIncomingMessage(msg);
-        getMessageHandler(msg).processMessage(msg);
-      }
-      catch (MemberShunnedException e) {
-        // message from non-member - ignore
+      }finally {
+        long delta = DistributionStats.getStatTime() - startTime ;
+        JGroupsMessenger.this.services.getStatistics().incUDPDispachRequestTime(delta);
       }
     }