You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:03 UTC
[41/83] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
index 28ca380,0000000..6750a54
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionStats.java
@@@ -1,2051 -1,0 +1,2065 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.StatisticDescriptor;
+import com.gemstone.gemfire.Statistics;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.StatisticsType;
+import com.gemstone.gemfire.StatisticsTypeFactory;
+import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+//import java.io.*;
+import com.gemstone.gemfire.internal.tcp.Buffers;
+import com.gemstone.gemfire.internal.util.Breadcrumbs;
+
+/**
+ * This class maintains statistics in GemFire about the distribution
+ * manager and distribution in general.
+ *
+ * @author Darrel Schneider
+ * @author Bruce Schuchardt
+ *
+ */
+public class DistributionStats implements DMStats {
+ private static final Logger logger = LogService.getLogger();
+
+ public static boolean enableClockStats = false;
+
+
+ ////////////////// Statistic "Id" Fields //////////////////
+
+ private final static StatisticsType type;
+ private final static int sentMessagesId;
+ private final static int sentCommitMessagesId;
+ private final static int commitWaitsId;
+ private final static int sentMessagesTimeId;
+ private final static int sentMessagesMaxTimeId;
+ private final static int broadcastMessagesId;
+ private final static int broadcastMessagesTimeId;
+ private final static int receivedMessagesId;
+ private final static int receivedBytesId;
+ private final static int sentBytesId;
+ private final static int processedMessagesId;
+ private final static int processedMessagesTimeId;
+ private final static int messageProcessingScheduleTimeId;
+ private final static int messageChannelTimeId;
++ private final static int udpDispatchRequestTimeId;
+ private final static int replyMessageTimeId;
+ private final static int distributeMessageTimeId;
+ private final static int nodesId;
+ private final static int overflowQueueSizeId;
+ private final static int processingThreadsId;
+ private final static int serialThreadsId;
+ private final static int waitingThreadsId;
+ private final static int highPriorityThreadsId;
+ private final static int partitionedRegionThreadsId;
+ private final static int functionExecutionThreadsId;
+ private final static int partitionedRegionThreadJobsId;
+ private final static int functionExecutionThreadJobsId;
+ private final static int waitingQueueSizeId;
+ private final static int overflowQueueThrottleTimeId;
+ private final static int overflowQueueThrottleCountId;
+ private final static int highPriorityQueueSizeId;
+ private final static int highPriorityQueueThrottleTimeId;
+ private final static int highPriorityQueueThrottleCountId;
+ private final static int partitionedRegionQueueSizeId;
+ private final static int partitionedRegionQueueThrottleTimeId;
+ private final static int partitionedRegionQueueThrottleCountId;
+ private final static int functionExecutionQueueSizeId;
+ private final static int functionExecutionQueueThrottleCountId;
+ private final static int functionExecutionQueueThrottleTimeId;
+ private final static int serialQueueSizeId;
+ private final static int serialQueueBytesId;
+ private final static int serialPooledThreadId;
+ private final static int serialQueueThrottleTimeId;
+ private final static int serialQueueThrottleCountId;
+ private final static int replyWaitsInProgressId;
+ private final static int replyWaitsCompletedId;
+ private final static int replyWaitTimeId;
+ private final static int replyTimeoutsId;
+ private final static int replyWaitMaxTimeId;
+ private final static int receiverConnectionsId;
+ private final static int failedAcceptsId;
+ private final static int failedConnectsId;
+ private final static int reconnectAttemptsId;
+ private final static int lostConnectionLeaseId;
+ private final static int sharedOrderedSenderConnectionsId;
+ private final static int sharedUnorderedSenderConnectionsId;
+ private final static int threadOrderedSenderConnectionsId;
+ private final static int threadUnorderedSenderConnectionsId;
+
+ private final static int syncSocketWritesInProgressId;
+ private final static int syncSocketWriteTimeId;
+ private final static int syncSocketWritesId;
+ private final static int syncSocketWriteBytesId;
+
+ private final static int ucastReadsId;
+ private final static int ucastReadBytesId;
+ private final static int ucastWritesId;
+ private final static int ucastWriteBytesId;
+ private final static int ucastRetransmitsId;
+
+ private final static int mcastReadsId;
+ private final static int mcastReadBytesId;
+ private final static int mcastWritesId;
+ private final static int mcastWriteBytesId;
+ private final static int mcastRetransmitsId;
+ private final static int mcastRetransmitRequestsId;
+
+ private final static int serializationTimeId;
+ private final static int serializationsId;
+ private final static int serializedBytesId;
+
+ private final static int pdxSerializationsId;
+ private final static int pdxSerializedBytesId;
+
+ private final static int deserializationTimeId;
+ private final static int deserializationsId;
+ private final static int deserializedBytesId;
+ private final static int pdxDeserializationsId;
+ private final static int pdxDeserializedBytesId;
+ private final static int pdxInstanceDeserializationsId;
+ private final static int pdxInstanceDeserializationTimeId;
+ private final static int pdxInstanceCreationsId;
+
+ private final static int msgSerializationTimeId;
+ private final static int msgDeserializationTimeId;
+
+ private final static int batchSendTimeId;
+ private final static int batchCopyTimeId;
+ private final static int batchWaitTimeId;
+ private final static int batchFlushTimeId;
+
+ private final static int threadOwnedReceiversId;
+ private final static int threadOwnedReceiversId2;
+
+ private final static int asyncSocketWritesInProgressId;
+ private final static int asyncSocketWritesId;
+ private final static int asyncSocketWriteRetriesId;
+ private final static int asyncSocketWriteTimeId;
+ private final static int asyncSocketWriteBytesId;
+
+ private final static int socketLocksInProgressId;
+ private final static int socketLocksId;
+ private final static int socketLockTimeId;
+
+ private final static int bufferAcquiresInProgressId;
+ private final static int bufferAcquiresId;
+ private final static int bufferAcquireTimeId;
+
+ private final static int asyncQueueAddTimeId;
+ private final static int asyncQueueRemoveTimeId;
+
+ private static final int asyncQueuesId;
+ private static final int asyncQueueFlushesInProgressId;
+ private static final int asyncQueueFlushesCompletedId;
+ private static final int asyncQueueFlushTimeId;
+ private static final int asyncQueueTimeoutExceededId;
+ private static final int asyncQueueSizeExceededId;
+ private static final int asyncDistributionTimeoutExceededId;
+ private static final int asyncQueueSizeId;
+ private static final int asyncQueuedMsgsId;
+ private static final int asyncDequeuedMsgsId;
+ private static final int asyncConflatedMsgsId;
+
+ private static final int asyncThreadsId;
+ private static final int asyncThreadInProgressId;
+ private static final int asyncThreadCompletedId;
+ private static final int asyncThreadTimeId;
+
+ private static final int receiverDirectBufferSizeId;
+ private static final int receiverHeapBufferSizeId;
+ private static final int senderDirectBufferSizeId;
+ private static final int senderHeapBufferSizeId;
+
+ private static final int messagesBeingReceivedId;
+ private static final int messageBytesBeingReceivedId;
+
+ private static final int serialThreadStartsId;
+ private static final int viewThreadStartsId;
+ private static final int processingThreadStartsId;
+ private static final int highPriorityThreadStartsId;
+ private static final int waitingThreadStartsId;
+ private static final int partitionedRegionThreadStartsId;
+ private static final int functionExecutionThreadStartsId;
+ private static final int serialPooledThreadStartsId;
+ private static final int TOSentMsgId;
+
+ private static final int replyHandoffTimeId;
+
+ private final static int viewThreadsId;
+ private final static int serialThreadJobsId;
+ private final static int viewProcessorThreadJobsId;
+ private final static int serialPooledThreadJobsId;
+ private final static int pooledMessageThreadJobsId;
+ private final static int highPriorityThreadJobsId;
+ private final static int waitingPoolThreadJobsId;
+
+ private final static int eldersId;
+ private final static int initialImageMessagesInFlightId;
+ private final static int initialImageRequestsInProgressId;
+
+ //For GMSHealthMonitor
+ private final static int heartbeatRequestsSentId;
+ private final static int heartbeatRequestsReceivedId;
+ private final static int heartbeatsSentId;
+ private final static int heartbeatsReceivedId;
+ private final static int suspectsSentId;
+ private final static int suspectsReceivedId;
+ private final static int finalCheckRequestsSentId;
+ private final static int finalCheckRequestsReceivedId;
+ private final static int finalCheckResponsesSentId;
+ private final static int finalCheckResponsesReceivedId;
+ private final static int tcpFinalCheckRequestsSentId;
+ private final static int tcpFinalCheckRequestsReceivedId;
+ private final static int tcpFinalCheckResponsesSentId;
+ private final static int tcpFinalCheckResponsesReceivedId;
+ private final static int udpFinalCheckRequestsSentId;
+ private final static int udpFinalCheckRequestsReceivedId;
+ private final static int udpFinalCheckResponsesSentId;
+ private final static int udpFinalCheckResponsesReceivedId;
+
+ static {
+ String statName = "DistributionStats";
+ String statDescription = "Statistics on the gemfire distribution layer.";
+
+ final String sentMessagesDesc = "The number of distribution messages that this GemFire system has sent. This includes broadcastMessages.";
+ final String sentCommitMessagesDesc = "The number of transaction commit messages that this GemFire system has created to be sent. Note that it is possible for a commit to only create one message even though it will end up being sent to multiple recipients.";
+ final String commitWaitsDesc = "The number of transaction commits that had to wait for a response before they could complete.";
+ final String sentMessagesTimeDesc = "The total amount of time this distribution manager has spent sending messages. This includes broadcastMessagesTime.";
+ final String sentMessagesMaxTimeDesc = "The highest amount of time this distribution manager has spent distributing a single message to the network.";
+ final String broadcastMessagesDesc = "The number of distribution messages that this GemFire system has broadcast. A broadcast message is one sent to every other manager in the group.";
+ final String broadcastMessagesTimeDesc = "The total amount of time this distribution manager has spent broadcasting messages. A broadcast message is one sent to every other manager in the group.";
+ final String receivedMessagesDesc = "The number of distribution messages that this GemFire system has received.";
+ final String receivedBytesDesc = "The number of distribution message bytes that this GemFire system has received.";
+ final String sentBytesDesc = "The number of distribution message bytes that this GemFire system has sent.";
+ final String processedMessagesDesc = "The number of distribution messages that this GemFire system has processed.";
+ final String processedMessagesTimeDesc = "The amount of time this distribution manager has spent in message.process().";
+ final String messageProcessingScheduleTimeDesc = "The amount of time this distribution manager has spent dispatching message to processor threads.";
+ final String overflowQueueSizeDesc = "The number of normal distribution messages currently waiting to be processed.";
+ final String waitingQueueSizeDesc = "The number of distribution messages currently waiting for some other resource before they can be processed.";
+ final String overflowQueueThrottleTimeDesc = "The total amount of time, in nanoseconds, spent delayed by the overflow queue throttle.";
+ final String overflowQueueThrottleCountDesc = "The total number of times a thread was delayed in adding a normal message to the overflow queue.";
+ final String highPriorityQueueSizeDesc = "The number of high priority distribution messages currently waiting to be processed.";
+ final String highPriorityQueueThrottleTimeDesc = "The total amount of time, in nanoseconds, spent delayed by the high priority queue throttle.";
+ final String highPriorityQueueThrottleCountDesc = "The total number of times a thread was delayed in adding a normal message to the high priority queue.";
+ final String serialQueueSizeDesc = "The number of serial distribution messages currently waiting to be processed.";
+ final String serialQueueBytesDesc = "The approximate number of bytes consumed by serial distribution messages currently waiting to be processed.";
+ final String serialPooledThreadDesc = "The number of threads created in the SerialQueuedExecutorPool.";
+ final String serialQueueThrottleTimeDesc = "The total amount of time, in nanoseconds, spent delayed by the serial queue throttle.";
+ final String serialQueueThrottleCountDesc = "The total number of times a thread was delayed in adding a ordered message to the serial queue.";
+ final String serialThreadsDesc = "The number of threads currently processing serial/ordered messages.";
+ final String processingThreadsDesc = "The number of threads currently processing normal messages.";
+ final String highPriorityThreadsDesc = "The number of threads currently processing high priority messages.";
+ final String partitionedRegionThreadsDesc = "The number of threads currently processing partitioned region messages.";
+ final String functionExecutionThreadsDesc = "The number of threads currently processing function execution messages.";
+ final String waitingThreadsDesc = "The number of threads currently processing messages that had to wait for a resource.";
+ final String messageChannelTimeDesc = "The total amount of time received messages spent in the distribution channel";
++ final String udpDispatchRequestTimeDesc = "The total amount of time spent deserializing and dispatching UDP messages in the message-reader thread.";
+ final String replyMessageTimeDesc = "The amount of time spent processing reply messages. This includes both processedMessagesTime and messageProcessingScheduleTime.";
+ final String distributeMessageTimeDesc = "The amount of time it takes to prepare a message and send it on the network. This includes sentMessagesTime.";
+ final String nodesDesc = "The current number of nodes in this distributed system.";
+ final String replyWaitsInProgressDesc = "Current number of threads waiting for a reply.";
+ final String replyWaitsCompletedDesc = "Total number of times waits for a reply have completed.";
+ final String replyWaitTimeDesc = "Total time spent waiting for a reply to a message.";
+ final String replyWaitMaxTimeDesc = "Maximum time spent transmitting and then waiting for a reply to a message. See sentMessagesMaxTime for related information";
+ final String replyTimeoutsDesc =
+ "Total number of message replies that have timed out.";
+ final String receiverConnectionsDesc =
+ "Current number of sockets dedicated to receiving messages.";
+ final String failedAcceptsDesc =
+ "Total number of times an accept (receiver creation) of a connect from some other member has failed";
+ final String failedConnectsDesc =
+ "Total number of times a connect (sender creation) to some other member has failed.";
+ final String reconnectAttemptsDesc =
+ "Total number of times an established connection was lost and a reconnect was attempted.";
+ final String lostConnectionLeaseDesc =
+ "Total number of times an unshared sender socket has remained idle long enough that its lease expired.";
+ final String sharedOrderedSenderConnectionsDesc =
+ "Current number of shared sockets dedicated to sending ordered messages.";
+ final String sharedUnorderedSenderConnectionsDesc =
+ "Current number of shared sockets dedicated to sending unordered messages.";
+ final String threadOrderedSenderConnectionsDesc =
+ "Current number of thread sockets dedicated to sending ordered messages.";
+ final String threadUnorderedSenderConnectionsDesc =
+ "Current number of thread sockets dedicated to sending unordered messages.";
+
+ final String asyncQueuesDesc = "The current number of queues for asynchronous messaging.";
+ final String asyncQueueFlushesInProgressDesc = "Current number of asynchronous queues being flushed.";
+ final String asyncQueueFlushesCompletedDesc = "Total number of asynchronous queue flushes completed.";
+ final String asyncQueueFlushTimeDesc = "Total time spent flushing asynchronous queues.";
+ final String asyncQueueTimeoutExceededDesc = "Total number of asynchronous queues that have timed out by being blocked for more than async-queue-timeout milliseconds.";
+ final String asyncQueueSizeExceededDesc = "Total number of asynchronous queues that have exceeded max size.";
+ final String asyncDistributionTimeoutExceededDesc = "Total number of times the async-distribution-timeout has been exceeded during a socket write.";
+ final String asyncQueueSizeDesc = "The current size in bytes used for asynchronous queues.";
+ final String asyncQueuedMsgsDesc = "The total number of queued messages used for asynchronous queues.";
+ final String asyncDequeuedMsgsDesc = "The total number of queued messages that have been removed from the queue and successfully sent.";
+ final String asyncConflatedMsgsDesc = "The total number of queued conflated messages used for asynchronous queues.";
+
+ final String asyncThreadsDesc = "Total number of asynchronous message queue threads.";
+ final String asyncThreadInProgressDesc = "Current iterations of work performed by asynchronous message queue threads.";
+ final String asyncThreadCompletedDesc = "Total number of iterations of work performed by asynchronous message queue threads.";
+ final String asyncThreadTimeDesc = "Total time spent by asynchronous message queue threads performing iterations.";
+ final String receiverDirectBufferSizeDesc = "Current number of bytes allocated from direct memory as buffers for incoming messages.";
+ final String receiverHeapBufferSizeDesc = "Current number of bytes allocated from Java heap memory as buffers for incoming messages.";
+ final String senderDirectBufferSizeDesc = "Current number of bytes allocated from direct memory as buffers for outgoing messages.";
+ final String senderHeapBufferSizeDesc = "Current number of bytes allocated from Java heap memory as buffers for outoing messages.";
+
+ final String replyHandoffTimeDesc = "Total number of seconds to switch thread contexts from processing thread to application thread.";
+
+ final String partitionedRegionThreadJobsDesc = "The number of messages currently being processed by partitioned region threads";
+ final String functionExecutionThreadJobsDesc = "The number of messages currently being processed by function execution threads";
+ final String viewThreadsDesc = "The number of threads currently processing view messages.";
+ final String serialThreadJobsDesc = "The number of messages currently being processed by serial threads.";
+ final String viewThreadJobsDesc = "The number of messages currently being processed by view threads.";
+ final String serialPooledThreadJobsDesc = "The number of messages currently being processed by pooled serial processor threads.";
+ final String processingThreadJobsDesc = "The number of messages currently being processed by pooled message processor threads.";
+ final String highPriorityThreadJobsDesc = "The number of messages currently being processed by high priority processor threads.";
+ final String waitingThreadJobsDesc = "The number of messages currently being processed by waiting pooly processor threads.";
+
+ final String eldersDesc = "Current number of system elders hosted in this member.";
+ final String initialImageMessagesInFlightDesc = "The number of messages with initial image data sent from this member that have not yet been acknowledged.";
+ final String initialImageRequestsInProgressDesc = "The number of initial images this member is currently receiving.";
+
+ //For GMSHealthMonitor
+ final String heartbeatRequestsSentDesc = "Heartbeat request messages that this member has sent.";
+ final String heartbeatRequestsReceivedDesc = "Heartbeat request messages that this member has received.";
+
+ final String heartbeatsSentDesc = "Heartbeat messages that this member has sent.";
+ final String heartbeatsReceivedDesc = "Heartbeat messages that this member has received.";
+
+ final String suspectsSentDesc = "Suspect member messages that this member has sent.";
+ final String suspectsReceivedDesc = "Suspect member messages that this member has received.";
+
+ final String finalCheckRequestsSentDesc = "Final check requests that this member has sent.";
+ final String finalCheckRequestsReceivedDesc = "Final check requests that this member has received.";
+
+ final String finalCheckResponsesSentDesc = "Final check responses that this member has sent.";
+ final String finalCheckResponsesReceivedDesc = "Final check responses that this member has received.";
+
+ final String tcpFinalCheckRequestsSentDesc = "TCP final check requests that this member has sent.";
+ final String tcpFinalCheckRequestsReceivedDesc = "TCP final check requests that this member has received.";
+
+ final String tcpFinalCheckResponsesSentDesc = "TCP final check responses that this member has sent.";
+ final String tcpFinalCheckResponsesReceivedDesc = "TCP final check responses that this member has received.";
+
+ final String udpFinalCheckRequestsSentDesc = "UDP final check requests that this member has sent.";
+ final String udpFinalCheckRequestsReceivedDesc = "UDP final check requests that this member has received.";
+
+ final String udpFinalCheckResponsesSentDesc = "UDP final check responses that this member has sent.";
+ final String udpFinalCheckResponsesReceivedDesc = "UDP final check responses that this member has received.";
+
+ StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
+
+ type = f.createType(
+ statName,
+ statDescription,
+ new StatisticDescriptor[] {
+ f.createLongCounter("sentMessages", sentMessagesDesc, "messages"),
+ f.createLongCounter("commitMessages", sentCommitMessagesDesc, "messages"),
+ f.createLongCounter("commitWaits", commitWaitsDesc, "messages"),
+ f.createLongCounter("sentMessagesTime", sentMessagesTimeDesc, "nanoseconds", false),
+ f.createLongGauge("sentMessagesMaxTime", sentMessagesMaxTimeDesc, "milliseconds", false),
+ f.createLongCounter("broadcastMessages", broadcastMessagesDesc, "messages"),
+ f.createLongCounter("broadcastMessagesTime", broadcastMessagesTimeDesc, "nanoseconds", false),
+ f.createLongCounter("receivedMessages", receivedMessagesDesc, "messages"),
+ f.createLongCounter("receivedBytes", receivedBytesDesc, "bytes"),
+ f.createLongCounter("sentBytes", sentBytesDesc, "bytes"),
+ f.createLongCounter("processedMessages", processedMessagesDesc, "messages"),
+ f.createLongCounter("processedMessagesTime", processedMessagesTimeDesc, "nanoseconds", false),
+ f.createLongCounter("messageProcessingScheduleTime", messageProcessingScheduleTimeDesc, "nanoseconds", false),
+ f.createIntGauge("overflowQueueSize", overflowQueueSizeDesc, "messages"),
+ f.createIntGauge("waitingQueueSize", waitingQueueSizeDesc, "messages"),
+ f.createIntGauge("overflowQueueThrottleCount", overflowQueueThrottleCountDesc, "delays"),
+ f.createLongCounter("overflowQueueThrottleTime", overflowQueueThrottleTimeDesc, "nanoseconds", false),
+ f.createIntGauge("highPriorityQueueSize", highPriorityQueueSizeDesc, "messages"),
+ f.createIntGauge("highPriorityQueueThrottleCount", highPriorityQueueThrottleCountDesc, "delays"),
+ f.createLongCounter("highPriorityQueueThrottleTime", highPriorityQueueThrottleTimeDesc, "nanoseconds", false),
+ f.createIntGauge("partitionedRegionQueueSize", highPriorityQueueSizeDesc, "messages"),
+ f.createIntGauge("partitionedRegionQueueThrottleCount", highPriorityQueueThrottleCountDesc, "delays"),
+ f.createLongCounter("partitionedRegionQueueThrottleTime", highPriorityQueueThrottleTimeDesc, "nanoseconds", false),
+ f.createIntGauge("functionExecutionQueueSize", highPriorityQueueSizeDesc, "messages"),
+ f.createIntGauge("functionExecutionQueueThrottleCount", highPriorityQueueThrottleCountDesc, "delays"),
+ f.createLongCounter("functionExecutionQueueThrottleTime", highPriorityQueueThrottleTimeDesc, "nanoseconds", false),
+ f.createIntGauge("serialQueueSize", serialQueueSizeDesc, "messages"),
+ f.createIntGauge("serialQueueBytes", serialQueueBytesDesc, "bytes"),
+ f.createIntCounter("serialPooledThread", serialPooledThreadDesc, "threads"),
+ f.createIntGauge("serialQueueThrottleCount", serialQueueThrottleCountDesc, "delays"),
+ f.createLongCounter("serialQueueThrottleTime", serialQueueThrottleTimeDesc, "nanoseconds", false),
+ f.createIntGauge("serialThreads", serialThreadsDesc, "threads"),
+ f.createIntGauge("processingThreads", processingThreadsDesc, "threads"),
+ f.createIntGauge("highPriorityThreads", highPriorityThreadsDesc, "threads"),
+ f.createIntGauge("partitionedRegionThreads", partitionedRegionThreadsDesc, "threads"),
+ f.createIntGauge("functionExecutionThreads", functionExecutionThreadsDesc, "threads"),
+ f.createIntGauge("waitingThreads", waitingThreadsDesc, "threads"),
+ f.createLongCounter("messageChannelTime", messageChannelTimeDesc, "nanoseconds", false),
++ f.createLongCounter("udpDispatchRequestTime", udpDispatchRequestTimeDesc, "nanoseconds", false),
+ f.createLongCounter("replyMessageTime", replyMessageTimeDesc, "nanoseconds", false),
+ f.createLongCounter("distributeMessageTime", distributeMessageTimeDesc, "nanoseconds", false),
+ f.createIntGauge("nodes", nodesDesc, "nodes"),
+ f.createIntGauge("replyWaitsInProgress", replyWaitsInProgressDesc, "operations"),
+ f.createIntCounter("replyWaitsCompleted", replyWaitsCompletedDesc, "operations"),
+ f.createLongCounter("replyWaitTime", replyWaitTimeDesc, "nanoseconds", false),
+ f.createLongGauge("replyWaitMaxTime", replyWaitMaxTimeDesc, "milliseconds", false),
+ f.createLongCounter("replyTimeouts", replyTimeoutsDesc, "timeouts", false),
+ f.createIntGauge("receivers", receiverConnectionsDesc, "sockets"),
+ f.createIntGauge("sendersSO", sharedOrderedSenderConnectionsDesc, "sockets"),
+ f.createIntGauge("sendersSU", sharedUnorderedSenderConnectionsDesc, "sockets"),
+ f.createIntGauge("sendersTO", threadOrderedSenderConnectionsDesc, "sockets"),
+ f.createIntGauge("sendersTU", threadUnorderedSenderConnectionsDesc, "sockets"),
+ f.createIntCounter("failedAccepts", failedAcceptsDesc, "accepts"),
+ f.createIntCounter("failedConnects", failedConnectsDesc, "connects"),
+ f.createIntCounter("reconnectAttempts", reconnectAttemptsDesc, "connects"),
+ f.createIntCounter("senderTimeouts", lostConnectionLeaseDesc, "expirations"),
+
+ f.createIntGauge("syncSocketWritesInProgress", "Current number of synchronous/blocking socket write calls in progress.", "writes"),
+ f.createLongCounter("syncSocketWriteTime", "Total amount of time, in nanoseconds, spent in synchronous/blocking socket write calls.", "nanoseconds"),
+ f.createIntCounter("syncSocketWrites", "Total number of completed synchronous/blocking socket write calls.", "writes"),
+ f.createLongCounter("syncSocketWriteBytes", "Total number of bytes sent out in synchronous/blocking mode on sockets.", "bytes"),
+
+ f.createIntCounter("ucastReads", "Total number of unicast datagrams received", "datagrams"),
+ f.createLongCounter("ucastReadBytes", "Total number of bytes received in unicast datagrams", "bytes"),
+ f.createIntCounter("ucastWrites", "Total number of unicast datagram socket write calls.", "writes"),
+ f.createLongCounter("ucastWriteBytes", "Total number of bytes sent out on unicast datagram sockets.", "bytes"),
+ f.createIntCounter("ucastRetransmits", "Total number of unicast datagram socket retransmissions", "writes"),
+
+ f.createIntCounter("mcastReads", "Total number of multicast datagrams received", "datagrams"),
+ f.createLongCounter("mcastReadBytes", "Total number of bytes received in multicast datagrams", "bytes"),
+ f.createIntCounter("mcastWrites", "Total number of multicast datagram socket write calls.", "writes"),
+ f.createLongCounter("mcastWriteBytes", "Total number of bytes sent out on multicast datagram sockets.", "bytes"),
+ f.createIntCounter("mcastRetransmits", "Total number of multicast datagram socket retransmissions", "writes"),
+ f.createIntCounter("mcastRetransmitRequests", "Total number of multicast datagram socket retransmission requests sent to other processes", "requests"),
+
+ f.createLongCounter("serializationTime", "Total amount of time, in nanoseconds, spent serializing objects. This includes pdx serializations.", "nanoseconds"),
+ f.createIntCounter("serializations", "Total number of object serialization calls. This includes pdx serializations.", "ops"),
+ f.createLongCounter("serializedBytes", "Total number of bytes produced by object serialization. This includes pdx serializations.", "bytes"),
+ f.createIntCounter("pdxSerializations", "Total number of pdx serializations.", "ops"),
+ f.createLongCounter("pdxSerializedBytes", "Total number of bytes produced by pdx serialization.", "bytes"),
+ f.createLongCounter("deserializationTime", "Total amount of time, in nanoseconds, spent deserializing objects. This includes deserialization that results in a PdxInstance.", "nanoseconds"),
+ f.createIntCounter("deserializations", "Total number of object deserialization calls. This includes deserialization that results in a PdxInstance.", "ops"),
+ f.createLongCounter("deserializedBytes", "Total number of bytes read by object deserialization. This includes deserialization that results in a PdxInstance.", "bytes"),
+ f.createIntCounter("pdxDeserializations", "Total number of pdx deserializations.", "ops"),
+ f.createLongCounter("pdxDeserializedBytes", "Total number of bytes read by pdx deserialization.", "bytes"),
+ f.createLongCounter("msgSerializationTime", "Total amount of time, in nanoseconds, spent serializing messages.", "nanoseconds"),
+ f.createLongCounter("msgDeserializationTime", "Total amount of time, in nanoseconds, spent deserializing messages.", "nanoseconds"),
+ f.createIntCounter("pdxInstanceDeserializations", "Total number of times getObject has been called on a PdxInstance.", "ops"),
+ f.createLongCounter("pdxInstanceDeserializationTime", "Total amount of time, in nanoseconds, spent deserializing PdxInstances by calling getObject.", "nanoseconds"),
+ f.createIntCounter("pdxInstanceCreations", "Total number of times a deserialization created a PdxInstance.", "ops"),
+
+ f.createLongCounter("batchSendTime", "Total amount of time, in nanoseconds, spent queueing and flushing message batches", "nanoseconds"),
+ f.createLongCounter("batchWaitTime", "Reserved for future use", "nanoseconds"),
+ f.createLongCounter("batchCopyTime", "Total amount of time, in nanoseconds, spent copying messages for batched transmission", "nanoseconds"),
+ f.createLongCounter("batchFlushTime", "Total amount of time, in nanoseconds, spent flushing batched messages to the network", "nanoseconds"),
+
+ f.createIntGauge("asyncSocketWritesInProgress", "Current number of non-blocking socket write calls in progress.", "writes"),
+ f.createIntCounter("asyncSocketWrites", "Total number of non-blocking socket write calls completed.", "writes"),
+ f.createIntCounter("asyncSocketWriteRetries", "Total number of retries needed to write a single block of data using non-blocking socket write calls.", "writes"),
+ f.createLongCounter("asyncSocketWriteTime", "Total amount of time, in nanoseconds, spent in non-blocking socket write calls.", "nanoseconds"),
+ f.createLongCounter("asyncSocketWriteBytes", "Total number of bytes sent out on non-blocking sockets.", "bytes"),
+
+ f.createLongCounter("asyncQueueAddTime", "Total amount of time, in nanoseconds, spent in adding messages to async queue.", "nanoseconds"),
+ f.createLongCounter("asyncQueueRemoveTime", "Total amount of time, in nanoseconds, spent in removing messages from async queue.", "nanoseconds"),
+
+ f.createIntGauge("asyncQueues", asyncQueuesDesc, "queues"),
+ f.createIntGauge("asyncQueueFlushesInProgress", asyncQueueFlushesInProgressDesc, "operations"),
+ f.createIntCounter("asyncQueueFlushesCompleted", asyncQueueFlushesCompletedDesc, "operations"),
+ f.createLongCounter("asyncQueueFlushTime", asyncQueueFlushTimeDesc, "nanoseconds", false),
+ f.createIntCounter("asyncQueueTimeoutExceeded", asyncQueueTimeoutExceededDesc, "timeouts"),
+ f.createIntCounter("asyncQueueSizeExceeded", asyncQueueSizeExceededDesc, "operations"),
+ f.createIntCounter("asyncDistributionTimeoutExceeded", asyncDistributionTimeoutExceededDesc, "operations"),
+ f.createLongGauge("asyncQueueSize", asyncQueueSizeDesc, "bytes"),
+ f.createLongCounter("asyncQueuedMsgs", asyncQueuedMsgsDesc, "msgs"),
+ f.createLongCounter("asyncDequeuedMsgs", asyncDequeuedMsgsDesc, "msgs"),
+ f.createLongCounter("asyncConflatedMsgs", asyncConflatedMsgsDesc, "msgs"),
+
+ f.createIntGauge("asyncThreads", asyncThreadsDesc, "threads"),
+ f.createIntGauge("asyncThreadInProgress", asyncThreadInProgressDesc, "operations"),
+ f.createIntCounter("asyncThreadCompleted", asyncThreadCompletedDesc, "operations"),
+ f.createLongCounter("asyncThreadTime", asyncThreadTimeDesc, "nanoseconds", false),
+
+ f.createLongGauge("receiversTO", "Number of receiver threads owned by non-receiver threads in other members.", "threads"),
+ f.createLongGauge("receiversTO2", "Number of receiver threads owned in turn by receiver threads in other members", "threads"),
+
+ f.createLongGauge("receiverDirectBufferSize", receiverDirectBufferSizeDesc, "bytes"),
+ f.createLongGauge("receiverHeapBufferSize", receiverHeapBufferSizeDesc, "bytes"),
+ f.createLongGauge("senderDirectBufferSize", senderDirectBufferSizeDesc, "bytes"),
+ f.createLongGauge("senderHeapBufferSize", senderHeapBufferSizeDesc, "bytes"),
+ f.createIntGauge("socketLocksInProgress", "Current number of threads waiting to lock a socket", "threads", false),
+ f.createIntCounter("socketLocks", "Total number of times a socket has been locked.", "locks"),
+ f.createLongCounter("socketLockTime", "Total amount of time, in nanoseconds, spent locking a socket", "nanoseconds", false),
+ f.createIntGauge("bufferAcquiresInProgress", "Current number of threads waiting to acquire a buffer", "threads", false),
+ f.createIntCounter("bufferAcquires", "Total number of times a buffer has been acquired.", "operations"),
+ f.createLongCounter("bufferAcquireTime", "Total amount of time, in nanoseconds, spent acquiring a socket", "nanoseconds", false),
+
+ f.createIntGauge("messagesBeingReceived", "Current number of message being received off the network or being processed after reception.", "messages"),
+ f.createLongGauge("messageBytesBeingReceived", "Current number of bytes consumed by messages being received or processed.", "bytes"),
+
+ f.createLongCounter("serialThreadStarts", "Total number of times a thread has been created for the serial message executor.", "starts", false),
+ f.createLongCounter("viewThreadStarts", "Total number of times a thread has been created for the view message executor.", "starts", false),
+ f.createLongCounter("processingThreadStarts", "Total number of times a thread has been created for the pool processing normal messages.", "starts", false),
+ f.createLongCounter("highPriorityThreadStarts", "Total number of times a thread has been created for the pool handling high priority messages.", "starts", false),
+ f.createLongCounter("waitingThreadStarts", "Total number of times a thread has been created for the waiting pool.", "starts", false),
+ f.createLongCounter("partitionedRegionThreadStarts", "Total number of times a thread has been created for the pool handling partitioned region messages.", "starts", false),
+ f.createLongCounter("functionExecutionThreadStarts", "Total number of times a thread has been created for the pool handling function execution messages.", "starts", false),
+ f.createLongCounter("serialPooledThreadStarts", "Total number of times a thread has been created for the serial pool(s).", "starts", false),
+ f.createLongCounter("TOSentMsgs", "Total number of messages sent on thread owned senders", "messages", false),
+ f.createLongCounter("replyHandoffTime", replyHandoffTimeDesc, "nanoseconds"),
+
+ f.createIntGauge("partitionedRegionThreadJobs", partitionedRegionThreadJobsDesc, "messages"),
+ f.createIntGauge("functionExecutionThreadJobs", functionExecutionThreadJobsDesc, "messages"),
+ f.createIntGauge("viewThreads", viewThreadsDesc, "threads"),
+ f.createIntGauge("serialThreadJobs", serialThreadJobsDesc, "messages"),
+ f.createIntGauge("viewThreadJobs", viewThreadJobsDesc, "messages"),
+ f.createIntGauge("serialPooledThreadJobs", serialPooledThreadJobsDesc, "messages"),
+ f.createIntGauge("processingThreadJobs", processingThreadJobsDesc, "messages"),
+ f.createIntGauge("highPriorityThreadJobs", highPriorityThreadJobsDesc, "messages"),
+ f.createIntGauge("waitingThreadJobs", waitingThreadJobsDesc, "messages"),
+
+ f.createIntGauge("elders", eldersDesc, "elders"),
+ f.createIntGauge("initialImageMessagesInFlight", initialImageMessagesInFlightDesc, "messages"),
+ f.createIntGauge("initialImageRequestsInProgress", initialImageRequestsInProgressDesc, "requests"),
+
+ //For GMSHealthMonitor
+ f.createLongCounter("heartbeatRequestsSent", heartbeatRequestsSentDesc, "messages"),
+ f.createLongCounter("heartbeatRequestsReceived", heartbeatRequestsReceivedDesc, "messages"),
+ f.createLongCounter("heartbeatsSent", heartbeatsSentDesc, "messages"),
+ f.createLongCounter("heartbeatsReceived", heartbeatsReceivedDesc, "messages"),
+ f.createLongCounter("suspectsSent", suspectsSentDesc, "messages"),
+ f.createLongCounter("suspectsReceived", suspectsReceivedDesc, "messages"),
+ f.createLongCounter("finalCheckRequestsSent", finalCheckRequestsSentDesc, "messages"),
+ f.createLongCounter("finalCheckRequestsReceived", finalCheckRequestsReceivedDesc, "messages"),
+ f.createLongCounter("finalCheckResponsesSent", finalCheckResponsesSentDesc, "messages"),
+ f.createLongCounter("finalCheckResponsesReceived", finalCheckResponsesReceivedDesc, "messages"),
+ f.createLongCounter("tcpFinalCheckRequestsSent", tcpFinalCheckRequestsSentDesc, "nanoseconds", false),
+ f.createLongCounter("tcpFinalCheckRequestsReceived", tcpFinalCheckRequestsReceivedDesc, "nanoseconds", false),
+ f.createLongCounter("tcpFinalCheckResponsesSent", tcpFinalCheckResponsesSentDesc, "nanoseconds", false),
+ f.createLongCounter("tcpFinalCheckResponsesReceived", tcpFinalCheckResponsesReceivedDesc, "nanoseconds", false),
+ f.createLongCounter("udpFinalCheckRequestsSent", udpFinalCheckRequestsSentDesc, "messages"),
+ f.createLongCounter("udpFinalCheckRequestsReceived", udpFinalCheckRequestsReceivedDesc, "messages"),
+ f.createLongCounter("udpFinalCheckResponsesSent", udpFinalCheckResponsesSentDesc, "messages"),
+ f.createLongCounter("udpFinalCheckResponsesReceived", udpFinalCheckResponsesReceivedDesc, "messages"),
+ }
+ );
+
+ // Initialize id fields
+ sentMessagesId = type.nameToId("sentMessages");
+ sentCommitMessagesId = type.nameToId("commitMessages");
+ commitWaitsId = type.nameToId("commitWaits");
+ sentMessagesTimeId = type.nameToId("sentMessagesTime");
+ sentMessagesMaxTimeId = type.nameToId("sentMessagesMaxTime");
+ broadcastMessagesId = type.nameToId("broadcastMessages");
+ broadcastMessagesTimeId = type.nameToId("broadcastMessagesTime");
+ receivedMessagesId = type.nameToId("receivedMessages");
+ receivedBytesId = type.nameToId("receivedBytes");
+ sentBytesId = type.nameToId("sentBytes");
+ processedMessagesId = type.nameToId("processedMessages");
+ processedMessagesTimeId = type.nameToId("processedMessagesTime");
+ messageProcessingScheduleTimeId =
+ type.nameToId("messageProcessingScheduleTime");
+ messageChannelTimeId = type.nameToId("messageChannelTime");
++ udpDispatchRequestTimeId = type.nameToId("udpDispatchRequestTime");
+ replyMessageTimeId = type.nameToId("replyMessageTime");
+ distributeMessageTimeId = type.nameToId("distributeMessageTime");
+ nodesId = type.nameToId("nodes");
+ overflowQueueSizeId = type.nameToId("overflowQueueSize");
+ waitingQueueSizeId = type.nameToId("waitingQueueSize");
+ overflowQueueThrottleTimeId = type.nameToId("overflowQueueThrottleTime");
+ overflowQueueThrottleCountId = type.nameToId("overflowQueueThrottleCount");
+ highPriorityQueueSizeId = type.nameToId("highPriorityQueueSize");
+ highPriorityQueueThrottleTimeId = type.nameToId("highPriorityQueueThrottleTime");
+ highPriorityQueueThrottleCountId = type.nameToId("highPriorityQueueThrottleCount");
+ partitionedRegionQueueSizeId = type.nameToId("partitionedRegionQueueSize");
+ partitionedRegionQueueThrottleTimeId = type.nameToId("partitionedRegionQueueThrottleTime");
+ partitionedRegionQueueThrottleCountId = type.nameToId("partitionedRegionQueueThrottleCount");
+ functionExecutionQueueSizeId = type.nameToId("functionExecutionQueueSize");
+ functionExecutionQueueThrottleTimeId = type.nameToId("functionExecutionQueueThrottleTime");
+ functionExecutionQueueThrottleCountId = type.nameToId("functionExecutionQueueThrottleCount");
+ serialQueueSizeId = type.nameToId("serialQueueSize");
+ serialQueueBytesId = type.nameToId("serialQueueBytes");
+ serialPooledThreadId = type.nameToId("serialPooledThread");
+ serialQueueThrottleTimeId = type.nameToId("serialQueueThrottleTime");
+ serialQueueThrottleCountId = type.nameToId("serialQueueThrottleCount");
+ serialThreadsId = type.nameToId("serialThreads");
+ processingThreadsId = type.nameToId("processingThreads");
+ highPriorityThreadsId = type.nameToId("highPriorityThreads");
+ partitionedRegionThreadsId = type.nameToId("partitionedRegionThreads");
+ functionExecutionThreadsId = type.nameToId("functionExecutionThreads");
+ waitingThreadsId = type.nameToId("waitingThreads");
+ replyWaitsInProgressId = type.nameToId("replyWaitsInProgress");
+ replyWaitsCompletedId = type.nameToId("replyWaitsCompleted");
+ replyWaitTimeId = type.nameToId("replyWaitTime");
+ replyTimeoutsId = type.nameToId("replyTimeouts");
+ replyWaitMaxTimeId = type.nameToId("replyWaitMaxTime");
+ receiverConnectionsId = type.nameToId("receivers");
+ failedAcceptsId = type.nameToId("failedAccepts");
+ failedConnectsId = type.nameToId("failedConnects");
+ reconnectAttemptsId = type.nameToId("reconnectAttempts");
+ lostConnectionLeaseId = type.nameToId("senderTimeouts");
+ sharedOrderedSenderConnectionsId = type.nameToId("sendersSO");
+ sharedUnorderedSenderConnectionsId = type.nameToId("sendersSU");
+ threadOrderedSenderConnectionsId = type.nameToId("sendersTO");
+ threadUnorderedSenderConnectionsId = type.nameToId("sendersTU");
+
+ syncSocketWritesInProgressId = type.nameToId("syncSocketWritesInProgress");
+ syncSocketWriteTimeId = type.nameToId("syncSocketWriteTime");
+ syncSocketWritesId = type.nameToId("syncSocketWrites");
+ syncSocketWriteBytesId = type.nameToId("syncSocketWriteBytes");
+
+ ucastReadsId = type.nameToId("ucastReads");
+ ucastReadBytesId = type.nameToId("ucastReadBytes");
+ ucastWritesId = type.nameToId("ucastWrites");
+ ucastWriteBytesId = type.nameToId("ucastWriteBytes");
+ ucastRetransmitsId = type.nameToId("ucastRetransmits");
+
+ mcastReadsId = type.nameToId("mcastReads");
+ mcastReadBytesId = type.nameToId("mcastReadBytes");
+ mcastWritesId = type.nameToId("mcastWrites");
+ mcastWriteBytesId = type.nameToId("mcastWriteBytes");
+ mcastRetransmitsId = type.nameToId("mcastRetransmits");
+ mcastRetransmitRequestsId = type.nameToId("mcastRetransmitRequests");
+
+ serializationTimeId = type.nameToId("serializationTime");
+ serializationsId = type.nameToId("serializations");
+ serializedBytesId = type.nameToId("serializedBytes");
+ deserializationTimeId = type.nameToId("deserializationTime");
+ deserializationsId = type.nameToId("deserializations");
+ deserializedBytesId = type.nameToId("deserializedBytes");
+ pdxSerializationsId = type.nameToId("pdxSerializations");
+ pdxSerializedBytesId = type.nameToId("pdxSerializedBytes");
+ pdxDeserializationsId = type.nameToId("pdxDeserializations");
+ pdxDeserializedBytesId = type.nameToId("pdxDeserializedBytes");
+ pdxInstanceDeserializationsId = type.nameToId("pdxInstanceDeserializations");
+ pdxInstanceDeserializationTimeId = type.nameToId("pdxInstanceDeserializationTime");
+ pdxInstanceCreationsId = type.nameToId("pdxInstanceCreations");
+
+ msgSerializationTimeId = type.nameToId("msgSerializationTime");
+ msgDeserializationTimeId = type.nameToId("msgDeserializationTime");
+
+ batchSendTimeId = type.nameToId("batchSendTime");
+ batchCopyTimeId = type.nameToId("batchCopyTime");
+ batchWaitTimeId = type.nameToId("batchWaitTime");
+ batchFlushTimeId = type.nameToId("batchFlushTime");
+
+ asyncSocketWritesInProgressId = type.nameToId("asyncSocketWritesInProgress");
+ asyncSocketWritesId = type.nameToId("asyncSocketWrites");
+ asyncSocketWriteRetriesId = type.nameToId("asyncSocketWriteRetries");
+ asyncSocketWriteTimeId = type.nameToId("asyncSocketWriteTime");
+ asyncSocketWriteBytesId = type.nameToId("asyncSocketWriteBytes");
+
+ asyncQueueAddTimeId = type.nameToId("asyncQueueAddTime");
+ asyncQueueRemoveTimeId = type.nameToId("asyncQueueRemoveTime");
+
+ asyncQueuesId = type.nameToId("asyncQueues");
+ asyncQueueFlushesInProgressId = type.nameToId("asyncQueueFlushesInProgress");
+ asyncQueueFlushesCompletedId = type.nameToId("asyncQueueFlushesCompleted");
+ asyncQueueFlushTimeId = type.nameToId("asyncQueueFlushTime");
+ asyncQueueTimeoutExceededId = type.nameToId("asyncQueueTimeoutExceeded");
+ asyncQueueSizeExceededId = type.nameToId("asyncQueueSizeExceeded");
+ asyncDistributionTimeoutExceededId = type.nameToId("asyncDistributionTimeoutExceeded");
+ asyncQueueSizeId = type.nameToId("asyncQueueSize");
+ asyncQueuedMsgsId = type.nameToId("asyncQueuedMsgs");
+ asyncDequeuedMsgsId = type.nameToId("asyncDequeuedMsgs");
+ asyncConflatedMsgsId = type.nameToId("asyncConflatedMsgs");
+
+ asyncThreadsId = type.nameToId("asyncThreads");
+ asyncThreadInProgressId = type.nameToId("asyncThreadInProgress");
+ asyncThreadCompletedId = type.nameToId("asyncThreadCompleted");
+ asyncThreadTimeId = type.nameToId("asyncThreadTime");
+
+ threadOwnedReceiversId = type.nameToId("receiversTO");
+ threadOwnedReceiversId2 = type.nameToId("receiversTO2");
+
+ receiverDirectBufferSizeId = type.nameToId("receiverDirectBufferSize");
+ receiverHeapBufferSizeId = type.nameToId("receiverHeapBufferSize");
+ senderDirectBufferSizeId = type.nameToId("senderDirectBufferSize");
+ senderHeapBufferSizeId = type.nameToId("senderHeapBufferSize");
+
+ socketLocksInProgressId = type.nameToId("socketLocksInProgress");
+ socketLocksId = type.nameToId("socketLocks");
+ socketLockTimeId = type.nameToId("socketLockTime");
+
+ bufferAcquiresInProgressId = type.nameToId("bufferAcquiresInProgress");
+ bufferAcquiresId = type.nameToId("bufferAcquires");
+ bufferAcquireTimeId = type.nameToId("bufferAcquireTime");
+ messagesBeingReceivedId = type.nameToId("messagesBeingReceived");
+ messageBytesBeingReceivedId = type.nameToId("messageBytesBeingReceived");
+
+ serialThreadStartsId = type.nameToId("serialThreadStarts");
+ viewThreadStartsId = type.nameToId("viewThreadStarts");
+ processingThreadStartsId = type.nameToId("processingThreadStarts");
+ highPriorityThreadStartsId = type.nameToId("highPriorityThreadStarts");
+ waitingThreadStartsId = type.nameToId("waitingThreadStarts");
+ partitionedRegionThreadStartsId = type.nameToId("partitionedRegionThreadStarts");
+ functionExecutionThreadStartsId = type.nameToId("functionExecutionThreadStarts");
+ serialPooledThreadStartsId = type.nameToId("serialPooledThreadStarts");
+ TOSentMsgId = type.nameToId("TOSentMsgs");
+ replyHandoffTimeId = type.nameToId("replyHandoffTime");
+ partitionedRegionThreadJobsId = type.nameToId("partitionedRegionThreadJobs");
+ functionExecutionThreadJobsId = type.nameToId("functionExecutionThreadJobs");
+ viewThreadsId = type.nameToId("viewThreads");
+ serialThreadJobsId = type.nameToId("serialThreadJobs");
+ viewProcessorThreadJobsId = type.nameToId("viewThreadJobs");
+ serialPooledThreadJobsId = type.nameToId("serialPooledThreadJobs");
+ pooledMessageThreadJobsId = type.nameToId("processingThreadJobs");
+ highPriorityThreadJobsId = type.nameToId("highPriorityThreadJobs");
+ waitingPoolThreadJobsId = type.nameToId("waitingThreadJobs");
+
+ eldersId = type.nameToId("elders");
+ initialImageMessagesInFlightId = type.nameToId("initialImageMessagesInFlight");
+ initialImageRequestsInProgressId = type.nameToId("initialImageRequestsInProgress");
+
+ //For GMSHealthMonitor
+ heartbeatRequestsSentId = type.nameToId("heartbeatRequestsSent");
+ heartbeatRequestsReceivedId = type.nameToId("heartbeatRequestsReceived");
+ heartbeatsSentId = type.nameToId("heartbeatsSent");
+ heartbeatsReceivedId = type.nameToId("heartbeatsReceived");
+ suspectsSentId = type.nameToId("suspectsSent");
+ suspectsReceivedId = type.nameToId("suspectsReceived");
+ finalCheckRequestsSentId = type.nameToId("finalCheckRequestsSent");
+ finalCheckRequestsReceivedId = type.nameToId("finalCheckRequestsReceived");
+ finalCheckResponsesSentId = type.nameToId("finalCheckResponsesSent");
+ finalCheckResponsesReceivedId = type.nameToId("finalCheckResponsesReceived");
+ tcpFinalCheckRequestsSentId = type.nameToId("tcpFinalCheckRequestsSent");
+ tcpFinalCheckRequestsReceivedId = type.nameToId("tcpFinalCheckRequestsReceived");
+ tcpFinalCheckResponsesSentId = type.nameToId("tcpFinalCheckResponsesSent");
+ tcpFinalCheckResponsesReceivedId = type.nameToId("tcpFinalCheckResponsesReceived");
+ udpFinalCheckRequestsSentId = type.nameToId("udpFinalCheckRequestsSent");
+ udpFinalCheckRequestsReceivedId = type.nameToId("udpFinalCheckRequestsReceived");
+ udpFinalCheckResponsesSentId = type.nameToId("udpFinalCheckResponsesSent");
+ udpFinalCheckResponsesReceivedId = type.nameToId("udpFinalCheckResponsesReceived");
+ }
+
+ /** The Statistics object that we delegate most behavior to */
+ private final Statistics stats;
+
+// private final HistogramStats replyHandoffHistogram;
+// private final HistogramStats replyWaitHistogram;
+
+ //////////////////////// Constructors ////////////////////////
+
+ /**
+ * Creates a new <code>DistributionStats</code> and registers itself
+ * with the given statistics factory.
+ */
+ public DistributionStats(StatisticsFactory f, long statId) {
+ this.stats = f.createAtomicStatistics(type, "distributionStats", statId);
+// this.replyHandoffHistogram = new HistogramStats("ReplyHandOff", "nanoseconds", f,
+// new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000}, false);
+// this.replyWaitHistogram = new HistogramStats("ReplyWait", "nanoseconds", f,
+// new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000}, false);
+ Buffers.initBufferStats(this);
+ }
+ /**
+ * Used by tests to create an instance given its already existings stats.
+ */
+ public DistributionStats(Statistics stats) {
+ this.stats = stats;
+// this.replyHandoffHistogram = null;
+// this.replyWaitHistogram = null;
+ }
+
+ /**
+ * Returns the current NanoTime or, if clock stats are disabled, zero.
+ * @since 5.0
+ */
+ public static long getStatTime() {
+ return enableClockStats? NanoTimer.getTime() : 0;
+ }
+
+ ////////////////////// Instance Methods //////////////////////
+
+ public void close() {
+ this.stats.close();
+ }
+
+ /**
+ * Returns the total number of messages sent by the distribution
+ * manager
+ */
+ public long getSentMessages() {
+ return this.stats.getLong(sentMessagesId);
+ }
+ public void incTOSentMsg() {
+ this.stats.incLong(TOSentMsgId, 1);
+ }
+
+ public long getSentCommitMessages() {
+ return this.stats.getLong(sentCommitMessagesId);
+ }
+
+ public long getCommitWaits() {
+ return this.stats.getLong(commitWaitsId);
+ }
+
+ /**
+ * Increments the total number of messages sent by the distribution
+ * manager
+ */
+ public void incSentMessages(long messages) {
+ this.stats.incLong(sentMessagesId, messages);
+ }
+ /**
+ * Increments the total number of transactino commit messages
+ * sent by the distribution manager
+ */
+ public void incSentCommitMessages(long messages) {
+ this.stats.incLong(sentCommitMessagesId, messages);
+ }
+ public void incCommitWaits() {
+ this.stats.incLong(commitWaitsId, 1);
+ }
+
+ /**
+ * Returns the total number of nanoseconds spent sending messages.
+ */
+ public long getSentMessagesTime() {
+ return this.stats.getLong(sentMessagesTimeId);
+ }
+
+ /**
+ * Increments the total number of nanoseconds spend sending messages.<p>
+ * This also sets the sentMessagesMaxTime, if appropriate
+ */
+ public void incSentMessagesTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(sentMessagesTimeId, nanos);
+ long millis = nanos / 1000000;
+ if (getSentMessagesMaxTime() < millis) {
+ this.stats.setLong(sentMessagesMaxTimeId, millis);
+ }
+ }
+ }
+
+ /**
+ * Returns the longest time required to distribute a message, in nanos
+ */
+ public long getSentMessagesMaxTime() {
+ return this.stats.getLong(sentMessagesMaxTimeId);
+ }
+
+
+ /**
+ * Returns the total number of messages broadcast by the distribution
+ * manager
+ */
+ public long getBroadcastMessages() {
+ return this.stats.getLong(broadcastMessagesId);
+ }
+
+ /**
+ * Increments the total number of messages broadcast by the distribution
+ * manager
+ */
+ public void incBroadcastMessages(long messages) {
+ this.stats.incLong(broadcastMessagesId, messages);
+ }
+
+ /**
+ * Returns the total number of nanoseconds spent sending messages.
+ */
+ public long getBroadcastMessagesTime() {
+ return this.stats.getLong(broadcastMessagesTimeId);
+ }
+
+ /**
+ * Increments the total number of nanoseconds spend sending messages.
+ */
+ public void incBroadcastMessagesTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(broadcastMessagesTimeId, nanos);
+ }
+ }
+
+ /**
+ * Returns the total number of messages received by the distribution
+ * manager
+ */
+ public long getReceivedMessages() {
+ return this.stats.getLong(receivedMessagesId);
+ }
+
+ /**
+ * Increments the total number of messages received by the distribution
+ * manager
+ */
+ public void incReceivedMessages(long messages) {
+ this.stats.incLong(receivedMessagesId, messages);
+ }
+
+ /**
+ * Returns the total number of bytes received by the distribution
+ * manager
+ */
+ public long getReceivedBytes() {
+ return this.stats.getLong(receivedBytesId);
+ }
+
+ public void incReceivedBytes(long bytes) {
+ this.stats.incLong(receivedBytesId, bytes);
+ }
+
+ public void incSentBytes(long bytes) {
+ this.stats.incLong(sentBytesId, bytes);
+ }
+
+ /**
+ * Returns the total number of messages processed by the distribution
+ * manager
+ */
+ public long getProcessedMessages() {
+ return this.stats.getLong(processedMessagesId);
+ }
+
+ /**
+ * Increments the total number of messages processed by the distribution
+ * manager
+ */
+ public void incProcessedMessages(long messages) {
+ this.stats.incLong(processedMessagesId, messages);
+ }
+
+ /**
+ * Returns the total number of nanoseconds spent processing messages.
+ */
+ public long getProcessedMessagesTime() {
+ return this.stats.getLong(processedMessagesTimeId);
+ }
+
+ /**
+ * Increments the total number of nanoseconds spend processing messages.
+ */
+ public void incProcessedMessagesTime(long start) {
+ if (enableClockStats) {
+ this.stats.incLong(processedMessagesTimeId, getStatTime()-start);
+ }
+ }
+
+ /**
+ * Returns the total number of nanoseconds spent scheduling messages to be processed.
+ */
+ public long getMessageProcessingScheduleTime() {
+ return this.stats.getLong(messageProcessingScheduleTimeId);
+ }
+
+ /**
+ * Increments the total number of nanoseconds spent scheduling messages to be processed.
+ */
+ public void incMessageProcessingScheduleTime(long elapsed) {
+ if (enableClockStats) {
+ this.stats.incLong(messageProcessingScheduleTimeId, elapsed);
+ }
+ }
+
+ public int getOverflowQueueSize() {
+ return this.stats.getInt(overflowQueueSizeId);
+ }
+
+ public void incOverflowQueueSize(int messages) {
+ this.stats.incInt(overflowQueueSizeId, messages);
+ }
+
+ protected void incWaitingQueueSize(int messages) {
+ this.stats.incInt(waitingQueueSizeId, messages);
+ }
+
+ protected void incOverflowQueueThrottleCount(int delays) {
+ this.stats.incInt(overflowQueueThrottleCountId, delays);
+ }
+ protected void incOverflowQueueThrottleTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(overflowQueueThrottleTimeId, nanos);
+ }
+ }
+
+ protected void incHighPriorityQueueSize(int messages) {
+ this.stats.incInt(highPriorityQueueSizeId, messages);
+ }
+ protected void incHighPriorityQueueThrottleCount(int delays) {
+ this.stats.incInt(highPriorityQueueThrottleCountId, delays);
+ }
+ protected void incHighPriorityQueueThrottleTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(highPriorityQueueThrottleTimeId, nanos);
+ }
+ }
+
+ protected void incPartitionedRegionQueueSize(int messages) {
+ this.stats.incInt(partitionedRegionQueueSizeId, messages);
+ }
+ protected void incPartitionedRegionQueueThrottleCount(int delays) {
+ this.stats.incInt(partitionedRegionQueueThrottleCountId, delays);
+ }
+ protected void incPartitionedRegionQueueThrottleTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(partitionedRegionQueueThrottleTimeId, nanos);
+ }
+ }
+
+ protected void incFunctionExecutionQueueSize(int messages) {
+ this.stats.incInt(functionExecutionQueueSizeId, messages);
+ }
+ protected void incFunctionExecutionQueueThrottleCount(int delays) {
+ this.stats.incInt(functionExecutionQueueThrottleCountId, delays);
+ }
+ protected void incFunctionExecutionQueueThrottleTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(functionExecutionQueueThrottleTimeId, nanos);
+ }
+ }
+
+ protected void incSerialQueueSize(int messages) {
+ this.stats.incInt(serialQueueSizeId, messages);
+ }
+ protected void incSerialQueueBytes(int amount) {
+ this.stats.incInt(serialQueueBytesId, amount);
+ }
+ public int getSerialQueueBytes() {
+ return this.stats.getInt(serialQueueBytesId);
+ }
+ protected void incSerialPooledThread() {
+ this.stats.incInt(serialPooledThreadId, 1);
+ }
+
+ protected void incSerialQueueThrottleCount(int delays) {
+ this.stats.incInt(serialQueueThrottleCountId, delays);
+ }
+ protected void incSerialQueueThrottleTime(long nanos) {
+ if (enableClockStats) {
+ this.stats.incLong(serialQueueThrottleTimeId, nanos);
+ }
+ }
+ public int getNumProcessingThreads() {
+ return this.stats.getInt(processingThreadsId);
+ }
+
+ public void incNumProcessingThreads(int threads) {
+ this.stats.incInt(processingThreadsId, threads);
+ }
+
+ public int getNumSerialThreads() {
+ return this.stats.getInt(serialThreadsId);
+ }
+
+ public void incNumSerialThreads(int threads) {
+ this.stats.incInt(serialThreadsId, threads);
+ }
+ protected void incWaitingThreads(int threads) {
+ this.stats.incInt(waitingThreadsId, threads);
+ }
+
+ protected void incHighPriorityThreads(int threads) {
+ this.stats.incInt(highPriorityThreadsId, threads);
+ }
+
+ protected void incPartitionedRegionThreads(int threads) {
+ this.stats.incInt(partitionedRegionThreadsId, threads);
+ }
+
+ protected void incFunctionExecutionThreads(int threads) {
+ this.stats.incInt(functionExecutionThreadsId, threads);
+ }
+
+ public void incMessageChannelTime(long delta) {
+ if (enableClockStats) {
+ this.stats.incLong(messageChannelTimeId, delta);
+ }
+ }
++
++ public void incUDPDispatchRequestTime(long delta) {
++ if (enableClockStats) {
++ this.stats.incLong(udpDispatchRequestTimeId, delta);
++ }
++ }
+
++ public long getUDPDispatchRequestTime() {
++ return this.stats.getLong(udpDispatchRequestTimeId);
++ }
++
+ public long getReplyMessageTime() {
+ return this.stats.getLong(replyMessageTimeId);
+ }
+ public void incReplyMessageTime(long val) {
+ if (enableClockStats) {
+ this.stats.incLong(replyMessageTimeId, val);
+ }
+ }
+
+ public long getDistributeMessageTime() {
+ return this.stats.getLong(distributeMessageTimeId);
+ }
+ public void incDistributeMessageTime(long val) {
+ if (enableClockStats) {
+ this.stats.incLong(distributeMessageTimeId, val);
+ }
+ }
+
+ public int getNodes() {
+ return this.stats.getInt(nodesId);
+ }
+ public void setNodes(int val) {
+ this.stats.setInt(nodesId, val);
+ }
+ public void incNodes(int val) {
+ this.stats.incInt(nodesId, val);
+ }
+ public int getReplyWaitsInProgress() {
+ return stats.getInt(replyWaitsInProgressId);
+ }
+ public int getReplyWaitsCompleted() {
+ return stats.getInt(replyWaitsCompletedId);
+ }
+ public long getReplyWaitTime() {
+ return stats.getLong(replyWaitTimeId);
+ }
+ public long getReplyWaitMaxTime() {
+ return stats.getLong(replyWaitMaxTimeId);
+ }
+ public long startSocketWrite(boolean sync) {
+ if (sync) {
+ stats.incInt(syncSocketWritesInProgressId, 1);
+ } else {
+ stats.incInt(asyncSocketWritesInProgressId, 1);
+ }
+ return getStatTime();
+ }
+ public void endSocketWrite(boolean sync, long start, int bytesWritten, int retries) {
+ final long now = getStatTime();
+ if (sync) {
+ stats.incInt(syncSocketWritesInProgressId, -1);
+ stats.incInt(syncSocketWritesId, 1);
+ stats.incLong(syncSocketWriteBytesId, bytesWritten);
+ if (enableClockStats) {
+ stats.incLong(syncSocketWriteTimeId, now-start);
+ }
+ } else {
+ stats.incInt(asyncSocketWritesInProgressId, -1);
+ stats.incInt(asyncSocketWritesId, 1);
+ if (retries != 0) {
+ stats.incInt(asyncSocketWriteRetriesId, retries);
+ }
+ stats.incLong(asyncSocketWriteBytesId, bytesWritten);
+ if (enableClockStats) {
+ stats.incLong(asyncSocketWriteTimeId, now-start);
+ }
+ }
+ }
+ public long startSocketLock() {
+ stats.incInt(socketLocksInProgressId, 1);
+ return getStatTime();
+ }
+ public void endSocketLock(long start) {
+ long ts = getStatTime();
+ stats.incInt(socketLocksInProgressId, -1);
+ stats.incInt(socketLocksId, 1);
+ stats.incLong(socketLockTimeId, ts-start);
+ }
+ public long startBufferAcquire() {
+ stats.incInt(bufferAcquiresInProgressId, 1);
+ return getStatTime();
+ }
+ public void endBufferAcquire(long start) {
+ long ts = getStatTime();
+ stats.incInt(bufferAcquiresInProgressId, -1);
+ stats.incInt(bufferAcquiresId, 1);
+ stats.incLong(bufferAcquireTimeId, ts-start);
+ }
+
+ public void incUcastWriteBytes(int bytesWritten) {
+ stats.incInt(ucastWritesId, 1);
+ stats.incLong(ucastWriteBytesId, bytesWritten);
+ }
+
+ public void incMcastWriteBytes(int bytesWritten) {
+ stats.incInt(mcastWritesId, 1);
+ stats.incLong(mcastWriteBytesId, bytesWritten);
+ }
+
+ public int getMcastWrites() {
+ return stats.getInt(mcastWritesId);
+ }
+ public int getMcastReads() {
+ return stats.getInt(mcastReadsId);
+ }
+ public void incMcastReadBytes(int amount) {
+ stats.incInt(mcastReadsId, 1);
+ stats.incLong(mcastReadBytesId, amount);
+ }
+ public void incUcastReadBytes(int amount) {
+ stats.incInt(ucastReadsId, 1);
+ stats.incLong(ucastReadBytesId, amount);
+ }
+ public long startSerialization() {
+ return getStatTime();
+ }
+ public void endSerialization(long start, int bytes) {
+ if (enableClockStats) {
+ stats.incLong(serializationTimeId, getStatTime()-start);
+ }
+ stats.incInt(serializationsId, 1);
+ stats.incLong(serializedBytesId, bytes);
+ }
+ public long startPdxInstanceDeserialization() {
+ return getStatTime();
+ }
+ public void endPdxInstanceDeserialization(long start) {
+ if (enableClockStats) {
+ stats.incLong(pdxInstanceDeserializationTimeId, getStatTime()-start);
+ }
+ stats.incInt(pdxInstanceDeserializationsId, 1);
+ }
+ public void incPdxSerialization(int bytes) {
+ stats.incInt(pdxSerializationsId, 1);
+ stats.incLong(pdxSerializedBytesId, bytes);
+ }
+ public void incPdxDeserialization(int bytes) {
+ stats.incInt(pdxDeserializationsId, 1);
+ stats.incLong(pdxDeserializedBytesId, bytes);
+ }
+ public void incPdxInstanceCreations() {
+ stats.incInt(pdxInstanceCreationsId, 1);
+ }
+ public long startDeserialization() {
+ return getStatTime();
+ }
+ public void endDeserialization(long start, int bytes) {
+ if (enableClockStats) {
+ stats.incLong(deserializationTimeId, getStatTime()-start);
+ }
+ stats.incInt(deserializationsId, 1);
+ stats.incLong(deserializedBytesId, bytes);
+ }
+ public long startMsgSerialization() {
+ return getStatTime();
+ }
+ public void endMsgSerialization(long start) {
+ if (enableClockStats) {
+ stats.incLong(msgSerializationTimeId, getStatTime()-start);
+ }
+ }
+ public long startMsgDeserialization() {
+ return getStatTime();
+ }
+ public void endMsgDeserialization(long start) {
+ if (enableClockStats) {
+ stats.incLong(msgDeserializationTimeId, getStatTime()-start);
+ }
+ }
+ /**
+ * @return the timestamp that marks the start of the operation
+ */
+ public long startReplyWait() {
+ stats.incInt(replyWaitsInProgressId, 1);
+ return getStatTime();
+ }
+ public void endReplyWait(long startNanos, long initTime) {
+ if (enableClockStats) {
+ stats.incLong(replyWaitTimeId, getStatTime()-startNanos);
+// this.replyWaitHistogram.endOp(delta);
+ }
+ if (initTime != 0) {
+ long mswait = System.currentTimeMillis() - initTime;
+ if (mswait > getReplyWaitMaxTime()) {
+ stats.setLong(replyWaitMaxTimeId, mswait);
+ }
+ }
+ stats.incInt(replyWaitsInProgressId, -1);
+ stats.incInt(replyWaitsCompletedId, 1);
+
+ Breadcrumbs.setSendSide(null); // clear any recipient breadcrumbs set by the message
+ Breadcrumbs.setProblem(null); // clear out reply-wait errors
+ }
+
+ public void incReplyTimeouts() {
+ stats.incLong(replyTimeoutsId, 1L);
+ }
+
+ public long getReplyTimeouts() {
+ return stats.getLong(replyTimeoutsId);
+ }
+
+ public void incReceivers() {
+ stats.incInt(receiverConnectionsId, 1);
+ }
+ public void decReceivers() {
+ stats.incInt(receiverConnectionsId, -1);
+ }
+ public void incFailedAccept() {
+ stats.incInt(failedAcceptsId, 1);
+ }
+ public void incFailedConnect() {
+ stats.incInt(failedConnectsId, 1);
+ }
+ public void incReconnectAttempts() {
+ stats.incInt(reconnectAttemptsId, 1);
+ }
+ public void incLostLease() {
+ stats.incInt(lostConnectionLeaseId, 1);
+ }
+ public void incSenders(boolean shared, boolean preserveOrder) {
+ if (shared) {
+ if (preserveOrder) {
+ stats.incInt(sharedOrderedSenderConnectionsId, 1);
+ } else {
+ stats.incInt(sharedUnorderedSenderConnectionsId, 1);
+ }
+ } else {
+ if (preserveOrder) {
+ stats.incInt(threadOrderedSenderConnectionsId, 1);
+ } else {
+ stats.incInt(threadUnorderedSenderConnectionsId, 1);
+ }
+ }
+ }
+ public int getSendersSU() {
+ return stats.getInt(sharedUnorderedSenderConnectionsId);
+ }
+ public void decSenders(boolean shared, boolean preserveOrder) {
+ if (shared) {
+ if (preserveOrder) {
+ stats.incInt(sharedOrderedSenderConnectionsId, -1);
+ } else {
+ stats.incInt(sharedUnorderedSenderConnectionsId, -1);
+ }
+ } else {
+ if (preserveOrder) {
+ stats.incInt(threadOrderedSenderConnectionsId, -1);
+ } else {
+ stats.incInt(threadUnorderedSenderConnectionsId, -1);
+ }
+ }
+ }
+
+ public int getAsyncSocketWritesInProgress() {
+ return stats.getInt(asyncSocketWritesInProgressId);
+ }
+ public int getAsyncSocketWrites() {
+ return stats.getInt(asyncSocketWritesId);
+ }
+ public int getAsyncSocketWriteRetries() {
+ return stats.getInt(asyncSocketWriteRetriesId);
+ }
+ public long getAsyncSocketWriteBytes() {
+ return stats.getLong(asyncSocketWriteBytesId);
+ }
+ public long getAsyncSocketWriteTime() {
+ return stats.getLong(asyncSocketWriteTimeId);
+ }
+ public long getAsyncQueueAddTime() {
+ return stats.getLong(asyncQueueAddTimeId);
+ }
+ public void incAsyncQueueAddTime(long inc) {
+ if (enableClockStats) {
+ stats.incLong(asyncQueueAddTimeId, inc);
+ }
+ }
+ public long getAsyncQueueRemoveTime() {
+ return stats.getLong(asyncQueueRemoveTimeId);
+ }
+ public void incAsyncQueueRemoveTime(long inc) {
+ if (enableClockStats) {
+ stats.incLong(asyncQueueRemoveTimeId, inc);
+ }
+ }
+
+ public int getAsyncQueues() {
+ return stats.getInt(asyncQueuesId);
+ }
+ public void incAsyncQueues(int inc) {
+ stats.incInt(asyncQueuesId, inc);
+ }
+ public int getAsyncQueueFlushesInProgress() {
+ return stats.getInt(asyncQueueFlushesInProgressId);
+ }
+ public int getAsyncQueueFlushesCompleted() {
+ return stats.getInt(asyncQueueFlushesCompletedId);
+ }
+ public long getAsyncQueueFlushTime() {
+ return stats.getLong(asyncQueueFlushTimeId);
+ }
+ public long startAsyncQueueFlush() {
+ stats.incInt(asyncQueueFlushesInProgressId, 1);
+ return getStatTime();
+ }
+ public void endAsyncQueueFlush(long start) {
+ stats.incInt(asyncQueueFlushesInProgressId, -1);
+ stats.incInt(asyncQueueFlushesCompletedId, 1);
+ if (enableClockStats) {
+ stats.incLong(asyncQueueFlushTimeId, getStatTime()-start);
+ }
+ }
+ public int getAsyncQueueTimeouts() {
+ return stats.getInt(asyncQueueTimeoutExceededId);
+ }
+ public void incAsyncQueueTimeouts(int inc) {
+ stats.incInt(asyncQueueTimeoutExceededId, inc);
+ }
+ public int getAsyncQueueSizeExceeded() {
+ return stats.getInt(asyncQueueSizeExceededId);
+ }
+ public void incAsyncQueueSizeExceeded(int inc) {
+ stats.incInt(asyncQueueSizeExceededId, inc);
+ }
+ public int getAsyncDistributionTimeoutExceeded() {
+ return stats.getInt(asyncDistributionTimeoutExceededId);
+ }
+ public void incAsyncDistributionTimeoutExceeded() {
+ stats.incInt(asyncDistributionTimeoutExceededId, 1);
+ }
+
+ public long getAsyncQueueSize() {
+ return stats.getLong(asyncQueueSizeId);
+ }
+ public void incAsyncQueueSize(long inc) {
+ stats.incLong(asyncQueueSizeId, inc);
+ }
+ public long getAsyncQueuedMsgs() {
+ return stats.getLong(asyncQueuedMsgsId);
+ }
+ public void incAsyncQueuedMsgs() {
+ stats.incLong(asyncQueuedMsgsId, 1);
+ }
+ public long getAsyncDequeuedMsgs() {
+ return stats.getLong(asyncDequeuedMsgsId);
+ }
+ public void incAsyncDequeuedMsgs() {
+ stats.incLong(asyncDequeuedMsgsId, 1);
+ }
+ public long getAsyncConflatedMsgs() {
+ return stats.getLong(asyncConflatedMsgsId);
+ }
+ public void incAsyncConflatedMsgs() {
+ stats.incLong(asyncConflatedMsgsId, 1);
+ }
+
+ public int getAsyncThreads() {
+ return stats.getInt(asyncThreadsId);
+ }
+ public void incAsyncThreads(int inc) {
+ stats.incInt(asyncThreadsId, inc);
+ }
+ public int getAsyncThreadInProgress() {
+ return stats.getInt(asyncThreadInProgressId);
+ }
+ public int getAsyncThreadCompleted() {
+ return stats.getInt(asyncThreadCompletedId);
+ }
+ public long getAsyncThreadTime() {
+ return stats.getLong(asyncThreadTimeId);
+ }
+ public long startAsyncThread() {
+ stats.incInt(asyncThreadInProgressId, 1);
+ return getStatTime();
+ }
+ public void endAsyncThread(long start) {
+ stats.incInt(asyncThreadInProgressId, -1);
+ stats.incInt(asyncThreadCompletedId, 1);
+ if (enableClockStats) {
+ stats.incLong(asyncThreadTimeId, getStatTime()-start);
+ }
+ }
+
+ /**
+ * Returns a helper object so that the overflow queue can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public ThrottledQueueStatHelper getOverflowQueueHelper() {
+ return new ThrottledQueueStatHelper() {
+ public void incThrottleCount() {
+ incOverflowQueueThrottleCount(1);
+ }
+ public void throttleTime(long nanos) {
+ incOverflowQueueThrottleTime(nanos);
+ }
+ public void add() {
+ incOverflowQueueSize(1);
+ }
+ public void remove() {
+ incOverflowQueueSize(-1);
+ }
+ public void remove(int count) {
+ incOverflowQueueSize(-count);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the waiting queue can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public QueueStatHelper getWaitingQueueHelper() {
+ return new QueueStatHelper() {
+ public void add() {
+ incWaitingQueueSize(1);
+ }
+ public void remove() {
+ incWaitingQueueSize(-1);
+ }
+ public void remove(int count) {
+ incWaitingQueueSize(-count);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the high priority queue can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public ThrottledQueueStatHelper getHighPriorityQueueHelper() {
+ return new ThrottledQueueStatHelper() {
+ public void incThrottleCount() {
+ incHighPriorityQueueThrottleCount(1);
+ }
+ public void throttleTime(long nanos) {
+ incHighPriorityQueueThrottleTime(nanos);
+ }
+ public void add() {
+ incHighPriorityQueueSize(1);
+ }
+ public void remove() {
+ incHighPriorityQueueSize(-1);
+ }
+ public void remove(int count) {
+ incHighPriorityQueueSize(-count);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the partitioned region queue can record its
+ * stats to the proper distribution stats.
+ * @since 5.0
+ */
+ public ThrottledQueueStatHelper getPartitionedRegionQueueHelper() {
+ return new ThrottledQueueStatHelper() {
+ public void incThrottleCount() {
+ incPartitionedRegionQueueThrottleCount(1);
+ }
+ public void throttleTime(long nanos) {
+ incPartitionedRegionQueueThrottleTime(nanos);
+ }
+ public void add() {
+ incPartitionedRegionQueueSize(1);
+ }
+ public void remove() {
+ incPartitionedRegionQueueSize(-1);
+ }
+ public void remove(int count) {
+ incPartitionedRegionQueueSize(-count);
+ }
+ };
+ }
+ /**
+ * Returns a helper object so that the partitioned region pool can record its
+ * stats to the proper distribution stats.
+ * @since 5.0.2
+ */
+ public PoolStatHelper getPartitionedRegionPoolHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incPartitionedRegionThreadJobs(1);
+ }
+ public void endJob() {
+ incPartitionedRegionThreadJobs(-1);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the function execution queue can record its
+ * stats to the proper distribution stats.
+ * @since 6.0
+ */
+ public ThrottledQueueStatHelper getFunctionExecutionQueueHelper() {
+ return new ThrottledQueueStatHelper() {
+ public void incThrottleCount() {
+ incFunctionExecutionQueueThrottleCount(1);
+ }
+ public void throttleTime(long nanos) {
+ incFunctionExecutionQueueThrottleTime(nanos);
+ }
+ public void add() {
+ incFunctionExecutionQueueSize(1);
+ }
+ public void remove() {
+ incFunctionExecutionQueueSize(-1);
+ }
+ public void remove(int count) {
+ incFunctionExecutionQueueSize(-count);
+ }
+ };
+ }
+ /**
+ * Returns a helper object so that the function execution pool can record its
+ * stats to the proper distribution stats.
+ * @since 6.0
+ */
+ public PoolStatHelper getFunctionExecutionPoolHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incFunctionExecutionThreadJobs(1);
+ }
+ public void endJob() {
+ incFunctionExecutionThreadJobs(-1);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the serial queue can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public ThrottledMemQueueStatHelper getSerialQueueHelper() {
+ return new ThrottledMemQueueStatHelper() {
+ public void incThrottleCount() {
+ incSerialQueueThrottleCount(1);
+ }
+ public void throttleTime(long nanos) {
+ incSerialQueueThrottleTime(nanos);
+ }
+ public void add() {
+ incSerialQueueSize(1);
+ }
+ public void remove() {
+ incSerialQueueSize(-1);
+ }
+ public void remove(int count) {
+ incSerialQueueSize(-count);
+ }
+ public void addMem(int amount) {
+ incSerialQueueBytes(amount);
+ }
+ public void removeMem(int amount) {
+ incSerialQueueBytes(amount * (-1));
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the normal pool can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public PoolStatHelper getNormalPoolHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incNormalPoolThreadJobs(1);
+ }
+ public void endJob() {
+ incNormalPoolThreadJobs(-1);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the waiting pool can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public PoolStatHelper getWaitingPoolHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incWaitingPoolThreadJobs(1);
+ }
+ public void endJob() {
+ incWaitingPoolThreadJobs(-1);
+ }
+ };
+ }
+
+ /**
+ * Returns a helper object so that the highPriority pool can record its
+ * stats to the proper distribution stats.
+ * @since 3.5
+ */
+ public PoolStatHelper getHighPriorityPoolHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incHighPriorityThreadJobs(1);
+ }
+ public void endJob() {
+ incHighPriorityThreadJobs(-1);
+ }
+ };
+ }
+
+ public void incBatchSendTime(long start) {
+ if (enableClockStats) {
+ stats.incLong(batchSendTimeId, getStatTime()-start);
+ }
+ }
+ public void incBatchCopyTime(long start) {
+ if (enableClockStats) {
+ stats.incLong(batchCopyTimeId, getStatTime()-start);
+ }
+ }
+ public void incBatchWaitTime(long start) {
+ if (enableClockStats) {
+ stats.incLong(batchWaitTimeId, getStatTime()-start);
+ }
+ }
+ public void incBatchFlushTime(long start) {
+ if (enableClockStats) {
+ stats.incLong(batchFlushTimeId, getStatTime()-start);
+ }
+ }
+ public void incUcastRetransmits() {
+ stats.incInt(ucastRetransmitsId, 1);
+ }
+ public void incMcastRetransmits() {
+ stats.incInt(mcastRetransmitsId, 1);
+ }
+ public void incMcastRetransmitRequests() {
+ stats.incInt(mcastRetransmitRequestsId, 1);
+ }
+ public int getMcastRetransmits() {
+ return stats.getInt(mcastRetransmitsId);
+ }
+
+ public void incThreadOwnedReceivers(long value, int dominoCount)
+ {
+ if (dominoCount < 2) {
+ stats.incLong(threadOwnedReceiversId, value);
+ } else {
+ stats.incLong(threadOwnedReceiversId2, value);
+ }
+ }
+
+ /**
+ * @since 5.0.2.4
+ */
+ public void incReceiverBufferSize(int inc, boolean direct) {
+ if (direct) {
+ stats.incLong(receiverDirectBufferSizeId, inc);
+ } else {
+ stats.incLong(receiverHeapBufferSizeId, inc);
+ }
+ }
+ /**
+ * @since 5.0.2.4
+ */
+ public void incSenderBufferSize(int inc, boolean direct) {
+ if (direct) {
+ stats.incLong(senderDirectBufferSizeId, inc);
+ } else {
+ stats.incLong(senderHeapBufferSizeId, inc);
+ }
+ }
+ public void incMessagesBeingReceived(boolean newMsg, int bytes) {
+ if (newMsg) {
+ stats.incInt(messagesBeingReceivedId, 1);
+ }
+ stats.incLong(messageBytesBeingReceivedId, bytes);
+ }
+ public void decMessagesBeingReceived(int bytes) {
+ stats.incInt(messagesBeingReceivedId, -1);
+ stats.incLong(messageBytesBeingReceivedId, -bytes);
+ }
+ public void incSerialThreadStarts() {
+ stats.incLong(serialThreadStartsId, 1);
+ }
+ public void incViewThreadStarts() {
+ stats.incLong(viewThreadStartsId, 1);
+ }
+ public void incProcessingThreadStarts() {
+ stats.incLong(processingThreadStartsId, 1);
+ }
+ public void incHighPriorityThreadStarts() {
+ stats.incLong(highPriorityThreadStartsId, 1);
+ }
+ public void incWaitingThreadStarts() {
+ stats.incLong(waitingThreadStartsId, 1);
+ }
+ public void incPartitionedRegionThreadStarts() {
+ stats.incLong(partitionedRegionThreadStartsId, 1);
+ }
+ public void incFunctionExecutionThreadStarts() {
+ stats.incLong(functionExecutionThreadStartsId, 1);
+ }
+ public void incSerialPooledThreadStarts() {
+ stats.incLong(serialPooledThreadStartsId, 1);
+ }
+
+ public void incReplyHandOffTime(long start) {
+ if (enableClockStats) {
+ long delta = getStatTime() - start;
+ stats.incLong(replyHandoffTimeId, delta);
+// this.replyHandoffHistogram.endOp(delta);
+ }
+ }
+
+ protected void incPartitionedRegionThreadJobs(int i) {
+ this.stats.incInt(partitionedRegionThreadJobsId, i);
+ }
+
+ protected void incFunctionExecutionThreadJobs(int i) {
+ this.stats.incInt(functionExecutionThreadJobsId, i);
+ }
+
+ public void incNumViewThreads(int threads) {
+ this.stats.incInt(viewThreadsId, threads);
+ }
+
+ public PoolStatHelper getSerialProcessorHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incNumSerialThreadJobs(1);
+ if (logger.isTraceEnabled()) {
+ logger.trace("[DM.SerialQueuedExecutor.execute] numSerialThreads={}", getNumSerialThreads());
+ }
+ }
+ public void endJob() {
+ incNumSerialThreadJobs(-1);
+ }
+ };
+ }
+
+ protected void incNumSerialThreadJobs(int jobs) {
+ this.stats.incInt(serialThreadJobsId, jobs);
+ }
+
+ public PoolStatHelper getViewProcessorHelper() {
+ return new PoolStatHelper() {
+ public void startJob() {
+ incViewProcessorThreadJobs(1);
+ if (logger.isTraceEnabled()) {
+ logger.trace("[DM.SerialQueuedExecutor.execute] numViewThreads={}", getNumViewThreads());
+ }
+ }
+ public void endJob() {
+ incViewProcessorThreadJobs(-1);
+ }
+ };
+ }
+
+ public int getNumViewThreads() {
+ return this.stats.getInt(viewThreadsId);
+ }
+
+ protected void incViewProcessorThreadJobs(int jobs) {
+ this.stats.incInt(viewProcessorT
<TRUNCATED>