You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2010/07/19 09:29:23 UTC
svn commit: r965373 - in
/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx:
ThreadingView.java ThreadingViewMBean.java
Author: hiranya
Date: Mon Jul 19 07:29:23 2010
New Revision: 965373
URL: http://svn.apache.org/viewvc?rev=965373&view=rev
Log:
Adding the threading view MBean. This is a generic MBean that can be used to monitor any named thread group. This is used to implement the monitoring for NHTTP thread pools. (See SYNAPSE-669)
Added:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingView.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingViewMBean.java
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingView.java?rev=965373&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingView.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingView.java Mon Jul 19 07:29:23 2010
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.jmx;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.lang.management.ThreadMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+
+/**
+ * ThreadingView can be used to monitor a named thread group over JMX. Data
+ * gathered by this monitor can be classified as short term data and long term data.
+ * Short term data is the statitics related to last 15 minutes of execution and they
+ * get updated every 2 seconds. Long term data is related to last 24 hours of execution
+ * and they get updated every 5 minutes. This monitor can also be configured to log a
+ * summary of the thread states periodically. If needed a margin can be set for the blocked
+ * thread percentage, upon exeeding which a system alert will be logged as a warning. By
+ * default both periodic logs and alerts are turned off.
+ */
+public class ThreadingView implements ThreadingViewMBean {
+
+ private static final String SYNAPSE_THREADING_VIEW = "Threading";
+ private static final int SHORT_SAMPLING_PERIOD = 2;
+ private static final int LONG_SAMPLING_PERIOD = 5 * 60;
+ private static final int SAMPLES_PER_MINUTE = 60/ SHORT_SAMPLING_PERIOD;
+ private static final int SAMPLES_PER_HOUR = 3600/LONG_SAMPLING_PERIOD;
+
+ private String threadNamePrefix = null;
+ private boolean periodicLogs = false;
+ private double alertMargin = -1;
+
+ private double avgBlockedWorkerPercentage = 0.0;
+ private double avgUnblockedWorkerPercentage = 0.0;
+
+ /**
+ * The queue of samples taken by the short term data collector task. Maintained as a fixed
+ * length queue. Only the data for the last 15 minutes of execution will be stored here.
+ * Maximum length = (60/2) * 15 = 450
+ */
+ private Queue<Double> shortTermDataQueue = new LinkedList<Double>();
+
+ /**
+ * The queue of samples taken by the long term data collector task. Maintained as a fixed
+ * length queue. Only the data for the last 24 hours of execution will be stored here.
+ * Maximum length = (3600/5*60) * 24 = 288
+ */
+ private Queue<Double> longTermDataQueue = new LinkedList<Double>();
+
+ private int samplesCount = 0;
+ private int totalCount = 0;
+
+ private Date resetTime = Calendar.getInstance().getTime();
+
+ private static final Log log = LogFactory.getLog(ThreadingView.class);
+
+ private static final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+
+ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ public ThreadingView(String threadNamePrefix) {
+ this.threadNamePrefix = threadNamePrefix;
+ initMBean();
+ }
+
+ public ThreadingView(String threadNamePrefix, boolean periodicLogs, double alertMargin) {
+ this.threadNamePrefix = threadNamePrefix;
+ this.periodicLogs = periodicLogs;
+ if (alertMargin > 0 && alertMargin < 100) {
+ this.alertMargin = alertMargin;
+ } else {
+ log.warn("Invalid alert margin for the thread group: " + threadNamePrefix + " - " +
+ "Using default value");
+ }
+ initMBean();
+ }
+
+ public void destroy() {
+ if (log.isDebugEnabled()) {
+ log.debug("Unregistering the Synapse threading view for the thread group: " +
+ threadNamePrefix);
+ }
+ MBeanRegistrar.getInstance().unRegisterMBean(SYNAPSE_THREADING_VIEW, threadNamePrefix);
+ scheduler.shutdownNow();
+ }
+
+ private void initMBean() {
+ if (log.isDebugEnabled()) {
+ log.debug("Starting a new Synapse threading view for the thread group: " +
+ threadNamePrefix);
+ }
+ scheduler.scheduleAtFixedRate(new ThreadingDataCollectorTask(), SHORT_SAMPLING_PERIOD,
+ SHORT_SAMPLING_PERIOD, TimeUnit.SECONDS);
+ scheduler.scheduleAtFixedRate(new LongTermDataCollectorTask(), LONG_SAMPLING_PERIOD,
+ LONG_SAMPLING_PERIOD, TimeUnit.SECONDS);
+ MBeanRegistrar.getInstance().registerMBean(this, SYNAPSE_THREADING_VIEW,
+ threadNamePrefix);
+ }
+
+ public int getTotalWorkerCount() {
+ int count = 0;
+ ThreadInfo[] threadInfo = dumpAllThreads();
+ for (ThreadInfo ti : threadInfo) {
+ if (ti != null && ti.getThreadName().startsWith(threadNamePrefix)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private double getBlockedWorkerPercentage() {
+ int totalCount = 0;
+ int blockedCount = 0;
+ ThreadInfo[] threadInfo = dumpAllThreads();
+ for (ThreadInfo ti : threadInfo) {
+ // see if the thread name matches the prefix
+ if (ti != null && ti.getThreadName().startsWith(threadNamePrefix)) {
+ totalCount++;
+ if (isBlocked(ti)) {
+ blockedCount++;
+ }
+ }
+ }
+ if (totalCount == 0) {
+ return 0;
+ }
+ return ((double) blockedCount/(double) totalCount) * 100;
+ }
+
+ public String[] getDeadLockedWorkers() {
+ String[] workers = null;
+ // JDK 1.6 has a better implementation of this method but since we are on JDK 1.5
+ // we have to stick with this for now
+ long[] threads = threadBean.findMonitorDeadlockedThreads();
+ if (threads != null) {
+ ThreadInfo[] threadInfo = threadBean.getThreadInfo(threads);
+ workers = new String[threadInfo.length];
+ for (int i = 0; i < threadInfo.length; i++) {
+ if (threadInfo[i] != null) {
+ workers[i] = threadInfo[i].getThreadName();
+ } else {
+ workers[i] = null;
+ }
+ }
+ }
+ return workers;
+ }
+
+ public double getAvgBlockedWorkerPercentage() {
+ return avgBlockedWorkerPercentage;
+ }
+
+ public double getAvgUnblockedWorkerPercentage() {
+ return avgUnblockedWorkerPercentage;
+ }
+
+ public double getLastMinuteBlockedWorkerPercentage() {
+ return getAverageBlockedThreads(1);
+ }
+
+ public double getLast5MinuteBlockedWorkerPercentage() {
+ return getAverageBlockedThreads(5);
+ }
+
+ public double getLast15MinuteBlockedWorkerPercentage() {
+ return getAverageBlockedThreads(15);
+ }
+
+ public double getLastHourBlockedWorkerPercentage() {
+ return getAverageBlockedThreadsByHour(1);
+ }
+
+ public double getLast8HourBlockedWorkerPercentage() {
+ return getAverageBlockedThreadsByHour(8);
+ }
+
+ public double getLast24HourBlockedWorkerPercentage() {
+ return getAverageBlockedThreadsByHour(24);
+ }
+
+ public Date getLastResetTime() {
+ return resetTime;
+ }
+
+ public void reset() {
+ avgBlockedWorkerPercentage = 0.0;
+ avgUnblockedWorkerPercentage = 0.0;
+ shortTermDataQueue.clear();
+ longTermDataQueue.clear();
+ samplesCount = 0;
+ totalCount = 0;
+ resetTime = Calendar.getInstance().getTime();
+ }
+
+ private boolean isBlocked(ThreadInfo threadInfo) {
+ // A thread is considered "Blocked" if it is in the BLOCKED state
+ // or if it is in the WAITING state due to some reason other than
+ // 'parking'.
+ Thread.State state = threadInfo.getThreadState();
+ if (state.equals(Thread.State.BLOCKED)) {
+ return true;
+ } else if (state.equals(Thread.State.WAITING) ||
+ state.equals(Thread.State.TIMED_WAITING)) {
+ StackTraceElement[] stacktrace = threadInfo.getStackTrace();
+ if (stacktrace.length > 0 && !"park".equals(stacktrace[0].getMethodName())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Get a sumarry of all threads running in the JVM.
+ *
+ * @return an array of ThreadInfo objects
+ */
+ private ThreadInfo[] dumpAllThreads() {
+ // JDK 1.6 has a built-in method for this
+ // But since we are on JDK 1.5 we have to follow this 2-step approach
+ long[] ids = threadBean.getAllThreadIds();
+ return threadBean.getThreadInfo(ids, 1);
+ }
+
+ /**
+ * Calculates and returns the average bloked worker percentage during last 'n' munites
+ * of execution
+ *
+ * @param n Number of minutes in the execution history
+ * @return the average blocked percentage as a double value
+ */
+ private double getAverageBlockedThreads(int n) {
+ int samples = n * SAMPLES_PER_MINUTE;
+ double sum = 0.0;
+ Double[] array = shortTermDataQueue.toArray(new Double[shortTermDataQueue.size()]);
+
+ if (samples > array.length) {
+ // If we don't have anough samples in the queue, try to approximate
+ // the value using all the available samples
+ samples = array.length;
+ for (int i = 0; i < array.length; i++) {
+ sum += array[i];
+ }
+ } else {
+ for (int i = 0; i < samples; i++) {
+ sum += array[array.length - 1 - i];
+ }
+ }
+
+ if (samples == 0) {
+ return 0.0;
+ }
+ return sum/samples;
+ }
+
+ /**
+ * Calculates and returns the average bloked worker percentage during last 'n' hours
+ * of execution
+ *
+ * @param n Number of hours in the execution history
+ * @return the average blocked percentage as a double value
+ */
+ private double getAverageBlockedThreadsByHour(int n) {
+ int samples = n * SAMPLES_PER_HOUR;
+ double sum = 0.0;
+ Double[] array = longTermDataQueue.toArray(new Double[longTermDataQueue.size()]);
+
+ if (samples > array.length) {
+ samples = array.length;
+ for (int i = 0; i < array.length; i++) {
+ sum += array[i];
+ }
+ } else {
+ for (int i = 0; i < samples; i++) {
+ sum += array[array.length - 1 - i];
+ }
+ }
+
+ if (samples == 0) {
+ return 0.0;
+ }
+ return sum/samples;
+ }
+
+
+ private class ThreadingDataCollectorTask implements Runnable {
+
+ public void run() {
+ samplesCount++;
+
+ double blocked = getBlockedWorkerPercentage();
+ double unblocked = 100 - blocked;
+
+ // calculate all time average values
+ avgBlockedWorkerPercentage = (avgBlockedWorkerPercentage * totalCount + blocked)/
+ (double) (totalCount + 1);
+ avgUnblockedWorkerPercentage = (avgUnblockedWorkerPercentage * totalCount + unblocked)/
+ (double) (totalCount + 1);
+
+ if (shortTermDataQueue.size() == 15 * SAMPLES_PER_MINUTE) {
+ shortTermDataQueue.remove();
+ }
+ shortTermDataQueue.offer(blocked);
+
+ if (samplesCount == SAMPLES_PER_MINUTE) {
+ samplesCount = 0;
+ periodicDump();
+ }
+ totalCount++;
+ }
+
+ private void periodicDump() {
+ if (periodicLogs && log.isDebugEnabled()) {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("Thread state summary for ").append(threadNamePrefix).
+ append(" threads - Blocked: ").append(avgBlockedWorkerPercentage).
+ append("%, Unblocked: ").append(avgUnblockedWorkerPercentage).
+ append("%");
+ log.debug(buffer.toString());
+ }
+
+ if (alertMargin > 0) {
+ double blocked = getAverageBlockedThreads(1);
+ if (blocked > alertMargin) {
+ log.warn("SYSTEM ALERT: " + blocked + "% of the " + threadNamePrefix +
+ " threads were in BLOCKED state during last minute!");
+ }
+ }
+ }
+ }
+
+ private class LongTermDataCollectorTask implements Runnable {
+ public void run() {
+ double blocked = getBlockedWorkerPercentage();
+
+ if (longTermDataQueue.size() == 24 * SAMPLES_PER_HOUR) {
+ longTermDataQueue.remove();
+ }
+ longTermDataQueue.offer(blocked);
+ }
+ }
+}
+
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingViewMBean.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingViewMBean.java?rev=965373&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingViewMBean.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/jmx/ThreadingViewMBean.java Mon Jul 19 07:29:23 2010
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.jmx;
+
+import java.util.Date;
+
+public interface ThreadingViewMBean {
+
+ public int getTotalWorkerCount();
+ public String[] getDeadLockedWorkers();
+ public double getAvgBlockedWorkerPercentage();
+ public double getAvgUnblockedWorkerPercentage();
+ public double getLastMinuteBlockedWorkerPercentage();
+ public double getLast5MinuteBlockedWorkerPercentage();
+ public double getLast15MinuteBlockedWorkerPercentage();
+ public double getLastHourBlockedWorkerPercentage();
+ public double getLast8HourBlockedWorkerPercentage();
+ public double getLast24HourBlockedWorkerPercentage();
+ public void reset();
+ public Date getLastResetTime();
+
+}