You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/02/18 00:11:00 UTC
[25/33] incubator-geode git commit: Added stat for udp message
receiver thread.
Added stat for udp message receiver thread.
Description: The total amount of time spent deserializing and
dispatching UDP messages in the message-reader thread.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c2175bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c2175bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c2175bc6
Branch: refs/heads/feature/GEODE-831
Commit: c2175bc6626d6132ba2cf986fa7f0acbc462fd37
Parents: 7627585
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Tue Feb 16 13:53:09 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Feb 16 13:56:22 2016 -0800
----------------------------------------------------------------------
.../gemfire/distributed/internal/DMStats.java | 2 +
.../distributed/internal/DistributionStats.java | 10 ++
.../internal/LonerDistributionManager.java | 2 +
.../gms/messenger/JGroupsMessenger.java | 105 ++++++++++---------
4 files changed, 70 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
index 7bf5b80..c241bc7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DMStats.java
@@ -171,6 +171,8 @@ public interface DMStats {
public void incNumSerialThreads(int threads);
public void incMessageChannelTime(long val);
+
+ public void incUDPDispachRequestTime(long val);
public long getReplyMessageTime();
public void incReplyMessageTime(long val);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
index 28ca380..634929e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
@@ -61,6 +61,7 @@ public class DistributionStats implements DMStats {
private final static int processedMessagesTimeId;
private final static int messageProcessingScheduleTimeId;
private final static int messageChannelTimeId;
+ private final static int udpDispachRequestTimeId;
private final static int replyMessageTimeId;
private final static int distributeMessageTimeId;
private final static int nodesId;
@@ -272,6 +273,7 @@ public class DistributionStats implements DMStats {
final String functionExecutionThreadsDesc = "The number of threads currently processing function execution messages.";
final String waitingThreadsDesc = "The number of threads currently processing messages that had to wait for a resource.";
final String messageChannelTimeDesc = "The total amount of time received messages spent in the distribution channel";
+ final String udpDispachRequestTimeDesc = "The total amount of time spent deserializing and dispatching UDP messages in the message-reader thread.";
final String replyMessageTimeDesc = "The amount of time spent processing reply messages. This includes both processedMessagesTime and messageProcessingScheduleTime.";
final String distributeMessageTimeDesc = "The amount of time it takes to prepare a message and send it on the network. This includes sentMessagesTime.";
final String nodesDesc = "The current number of nodes in this distributed system.";
@@ -409,6 +411,7 @@ public class DistributionStats implements DMStats {
f.createIntGauge("functionExecutionThreads", functionExecutionThreadsDesc, "threads"),
f.createIntGauge("waitingThreads", waitingThreadsDesc, "threads"),
f.createLongCounter("messageChannelTime", messageChannelTimeDesc, "nanoseconds", false),
+ f.createLongCounter("udpDispachRequestTime", udpDispachRequestTimeDesc, "nanoseconds", false),
f.createLongCounter("replyMessageTime", replyMessageTimeDesc, "nanoseconds", false),
f.createLongCounter("distributeMessageTime", distributeMessageTimeDesc, "nanoseconds", false),
f.createIntGauge("nodes", nodesDesc, "nodes"),
@@ -572,6 +575,7 @@ public class DistributionStats implements DMStats {
messageProcessingScheduleTimeId =
type.nameToId("messageProcessingScheduleTime");
messageChannelTimeId = type.nameToId("messageChannelTime");
+ udpDispachRequestTimeId = type.nameToId("udpDispachRequestTime");
replyMessageTimeId = type.nameToId("replyMessageTime");
distributeMessageTimeId = type.nameToId("distributeMessageTime");
nodesId = type.nameToId("nodes");
@@ -1078,6 +1082,12 @@ public class DistributionStats implements DMStats {
this.stats.incLong(messageChannelTimeId, delta);
}
}
+
+ public void incUDPDispachRequestTime(long delta) {
+ if (enableClockStats) {
+ this.stats.incLong(udpDispachRequestTimeId, delta);
+ }
+ }
public long getReplyMessageTime() {
return this.stats.getLong(replyMessageTimeId);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
index 419c096..50c07de 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
@@ -363,6 +363,8 @@ public class LonerDistributionManager implements DM {
@Override
public void incMessageChannelTime(long val) {}
@Override
+ public void incUDPDispachRequestTime(long val) {};
+ @Override
public long getReplyMessageTime() {return 0;}
@Override
public void incReplyMessageTime(long val) {}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c2175bc6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 3bd1e83..b9fcc38 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -70,6 +70,7 @@ import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
@@ -1024,58 +1025,64 @@ public class JGroupsMessenger implements Messenger {
@Override
public void receive(Message jgmsg) {
- if (services.getManager().shutdownInProgress()) {
- return;
- }
-
- if (logger.isTraceEnabled()) {
- logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
- }
-
- //Respond to ping messages sent from other systems that are in a auto reconnect state
- byte[] contents = jgmsg.getBuffer();
- if (contents == null) {
- return;
- }
- if (pingPonger.isPingMessage(contents)) {
+ long startTime = DistributionStats.getStatTime();
+ try {
+ if (services.getManager().shutdownInProgress()) {
+ return;
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
+ }
+
+ //Respond to ping messages sent from other systems that are in a auto reconnect state
+ byte[] contents = jgmsg.getBuffer();
+ if (contents == null) {
+ return;
+ }
+ if (pingPonger.isPingMessage(contents)) {
+ try {
+ pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
+ }
+ catch (Exception e) {
+ logger.info("Failed sending Pong response to " + jgmsg.getSrc());
+ }
+ return;
+ } else if (pingPonger.isPongMessage(contents)) {
+ pongsReceived.incrementAndGet();
+ return;
+ }
+
+ Object o = readJGMessage(jgmsg);
+ if (o == null) {
+ return;
+ }
+
+ DistributionMessage msg = (DistributionMessage)o;
+ assert msg.getSender() != null;
+
+ // admin-only VMs don't have caches, so we ignore cache operations
+ // multicast to them, avoiding deserialization cost and classpath
+ // problems
+ if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE)
+ && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
+ return;
+ }
+
+ msg.resetTimestamp();
+ msg.setBytesRead(jgmsg.getLength());
+
try {
- pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
+ logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
+ filterIncomingMessage(msg);
+ getMessageHandler(msg).processMessage(msg);
}
- catch (Exception e) {
- logger.info("Failed sending Pong response to " + jgmsg.getSrc());
+ catch (MemberShunnedException e) {
+ // message from non-member - ignore
}
- return;
- } else if (pingPonger.isPongMessage(contents)) {
- pongsReceived.incrementAndGet();
- return;
- }
-
- Object o = readJGMessage(jgmsg);
- if (o == null) {
- return;
- }
-
- DistributionMessage msg = (DistributionMessage)o;
- assert msg.getSender() != null;
-
- // admin-only VMs don't have caches, so we ignore cache operations
- // multicast to them, avoiding deserialization cost and classpath
- // problems
- if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE)
- && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) {
- return;
- }
-
- msg.resetTimestamp();
- msg.setBytesRead(jgmsg.getLength());
-
- try {
- logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender());
- filterIncomingMessage(msg);
- getMessageHandler(msg).processMessage(msg);
- }
- catch (MemberShunnedException e) {
- // message from non-member - ignore
+ }finally {
+ long delta = DistributionStats.getStatTime() - startTime ;
+ JGroupsMessenger.this.services.getStatistics().incUDPDispachRequestTime(delta);
}
}