You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2016/10/02 02:14:29 UTC
airavata git commit: Use MinMaxCounters and add more metrics to
measure threadpool sizes
Repository: airavata
Updated Branches:
refs/heads/lahiru/AIRAVATA-2107 97247e39a -> b220509ca
Use MinMaxCounters and add more metrics to measure threadpool sizes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b220509c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b220509c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b220509c
Branch: refs/heads/lahiru/AIRAVATA-2107
Commit: b220509caf7a1a060617d6662ad063c17fea678c
Parents: 97247e3
Author: Lahiru Ginnaliya Gamathige <la...@apache.org>
Authored: Sat Oct 1 19:14:16 2016 -0700
Committer: Lahiru Ginnaliya Gamathige <la...@apache.org>
Committed: Sat Oct 1 19:14:16 2016 -0700
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 8 +++---
.../gfac/core/GFacThreadPoolExecutor.java | 11 +++++++-
.../airavata/gfac/impl/HPCRemoteCluster.java | 16 ++++++------
.../org/apache/airavata/gfac/impl/SSHUtils.java | 10 ++++----
.../gfac/monitor/email/EmailBasedMonitor.java | 23 +++++++++++------
.../airavata/gfac/server/GfacServerHandler.java | 23 ++++++++++++++---
.../core/impl/GFACPassiveJobSubmitter.java | 2 +-
.../server/OrchestratorServerHandler.java | 27 ++++++++++++--------
.../OrchestratorServerThreadPoolExecutor.java | 21 ++++++++++++---
.../org/apache/airavata/server/ServerMain.java | 1 +
10 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index b9b6d03..fe076ee 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -22,7 +22,7 @@
package org.apache.airavata.api.server.handler;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.api.airavata_apiConstants;
import org.apache.airavata.api.server.security.interceptor.SecurityCheck;
@@ -93,9 +93,9 @@ public class AiravataServerHandler implements Airavata.Iface {
private Publisher statusPublisher;
private Publisher experimentPublisher;
private CredentialStoreService.Client csClient;
- private Counter experimentPublishCount = Kamon.metrics().counter(String.format("%s.experiment.publish-count", getClass().getCanonicalName()));
- private Counter experimentLaunchPublishCount = Kamon.metrics().counter(String.format("%s.experiment_launch.publish-count", getClass().getCanonicalName()));
- private Counter experimentCancelPublishCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.publish-count", getClass().getCanonicalName()));
+ private MinMaxCounter experimentPublishCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment.publish-count", getClass().getName()));
+ private MinMaxCounter experimentLaunchPublishCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_launch.publish-count", getClass().getName()));
+ private MinMaxCounter experimentCancelPublishCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_cancel.publish-count", getClass().getName()));
public AiravataServerHandler() {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
index 19073e6..bf6dca6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacThreadPoolExecutor.java
@@ -19,6 +19,8 @@
*/
package org.apache.airavata.gfac.core;
+import kamon.Kamon;
+import kamon.metric.instrument.Histogram;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.slf4j.Logger;
@@ -26,12 +28,16 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
public class GFacThreadPoolExecutor {
private final static Logger logger = LoggerFactory.getLogger(GFacThreadPoolExecutor.class);
public static final String GFAC_THREAD_POOL_SIZE = "gfac.thread.pool.size";
private static ExecutorService threadPool;
+ private static Histogram threadPoolQueueSize = Kamon.metrics().histogram("GFacThreadPoolExecutor.queue-size");
+ private static Histogram threadPoolActiveThreads = Kamon.metrics().histogram("GFacThreadPoolExecutor.active-threads");
+
public static ExecutorService getCachedThreadPool() {
if(threadPool ==null){
@@ -52,5 +58,8 @@ public class GFacThreadPoolExecutor {
return threadPool;
}
-
+ public static void record() {
+ threadPoolQueueSize.record(((ThreadPoolExecutor)threadPool).getQueue().size());
+ threadPoolActiveThreads.record(((ThreadPoolExecutor)threadPool).getActiveCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 9c97d37..cc320a1 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -26,7 +26,7 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.UserInfo;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -53,13 +53,13 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
private final SSHKeyAuthentication authentication;
private final JSch jSch;
private Session session;
- private Counter submittedJobCount = Kamon.metrics().counter(String.format("%s.submitted-jobs", getClass().getCanonicalName()));
- private Counter nonZeroExitCodeJobCount = Kamon.metrics().counter(String.format("%s.nonzero-exit-jobs", getClass().getCanonicalName()));
- private Counter emptyJobIdCount = Kamon.metrics().counter(String.format("%s.empty-jobid-jobs", getClass().getCanonicalName()));
- private Counter copyToFailCount = Kamon.metrics().counter(String.format("%s.copyTo-fail", getClass().getCanonicalName()));
- private Counter copyFromFailCount = Kamon.metrics().counter(String.format("%s.copyFrom-fail", getClass().getCanonicalName()));
- private Counter mkDirFailCount = Kamon.metrics().counter(String.format("%s.mkDir-fail", getClass().getCanonicalName()));
- private Counter listFailCount = Kamon.metrics().counter(String.format("%s.list-fail", getClass().getCanonicalName()));
+ private MinMaxCounter submittedJobCount = Kamon.metrics().minMaxCounter(String.format("%s.submitted-jobs", getClass().getName()));
+ private MinMaxCounter nonZeroExitCodeJobCount = Kamon.metrics().minMaxCounter(String.format("%s.nonzero-exit-jobs", getClass().getName()));
+ private MinMaxCounter emptyJobIdCount = Kamon.metrics().minMaxCounter(String.format("%s.empty-jobid-jobs", getClass().getName()));
+ private MinMaxCounter copyToFailCount = Kamon.metrics().minMaxCounter(String.format("%s.copyTo-fail", getClass().getName()));
+ private MinMaxCounter copyFromFailCount = Kamon.metrics().minMaxCounter(String.format("%s.copyFrom-fail", getClass().getName()));
+ private MinMaxCounter mkDirFailCount = Kamon.metrics().minMaxCounter(String.format("%s.mkDir-fail", getClass().getName()));
+ private MinMaxCounter listFailCount = Kamon.metrics().minMaxCounter(String.format("%s.list-fail", getClass().getName()));
public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index 2f59828..3f79358 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -25,7 +25,7 @@ import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
import kamon.metric.instrument.Histogram;
import org.apache.airavata.gfac.core.SSHApiException;
import org.slf4j.Logger;
@@ -45,11 +45,11 @@ import java.util.List;
*/
public class SSHUtils {
private static final Logger log = LoggerFactory.getLogger(SSHUtils.class);
- private static Histogram scpToBytes = Kamon.metrics().histogram(String.format("%s.scpTo-bytes", SSHUtils.class.getCanonicalName()));
- private static Counter scpToFailedCount = Kamon.metrics().counter(String.format("%s.scpTo-fail", SSHUtils.class.getCanonicalName()));
+ private static Histogram scpToBytes = Kamon.metrics().histogram(String.format("%s.scpTo-bytes", SSHUtils.class.getName()));
+ private static MinMaxCounter scpToFailedCount = Kamon.metrics().minMaxCounter(String.format("%s.scpTo-fail", SSHUtils.class.getName()));
- private static Histogram scpFromBytes = Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", SSHUtils.class.getCanonicalName()));
- private static Counter scpFromFailedCount = Kamon.metrics().counter(String.format("%s.scpFrom-fail", SSHUtils.class.getCanonicalName()));
+ private static Histogram scpFromBytes = Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", SSHUtils.class.getName()));
+ private static MinMaxCounter scpFromFailedCount = Kamon.metrics().minMaxCounter(String.format("%s.scpFrom-fail", SSHUtils.class.getName()));
/**
* This will copy a local file to a remote location
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index d1afdd6..69de6b2 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -21,7 +21,7 @@
package org.apache.airavata.gfac.monitor.email;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
import kamon.metric.instrument.Histogram;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
@@ -53,6 +53,7 @@ import javax.mail.search.FlagTerm;
import javax.mail.search.SearchTerm;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
public class EmailBasedMonitor implements JobMonitor, Runnable{
private static final Logger log = LoggerFactory.getLogger(EmailBasedMonitor.class);
@@ -74,10 +75,11 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
private Message[] flushUnseenMessages;
private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
private Timer timer;
- private Histogram monitorQueueSize = Kamon.metrics().histogram(String.format("%s.monitor-queue-size", getClass().getCanonicalName()));
- private Histogram cancelledJobs = Kamon.metrics().histogram(String.format("%s.cancelled-jobs", getClass().getCanonicalName()));
- private Counter completedJobCount = Kamon.metrics().counter(String.format("%s.completed-jobs", getClass().getCanonicalName()));
- private Counter failedJobCount = Kamon.metrics().counter(String.format("%s.failed-jobs", getClass().getCanonicalName()));
+ private Histogram monitorQueueSize = Kamon.metrics().histogram(String.format("%s.monitor-queue-size", getClass().getName()));
+ private Histogram cancelledJobs = Kamon.metrics().histogram(String.format("%s.cancelled-jobs", getClass().getName()));
+ private MinMaxCounter completedJobCount = Kamon.metrics().minMaxCounter(String.format("%s.completed-jobs", getClass().getName()));
+ private MinMaxCounter failedJobCount = Kamon.metrics().minMaxCounter(String.format("%s.failed-jobs", getClass().getName()));
+ private ExecutorService cachedThreadPool;
public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException {
init();
@@ -99,6 +101,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
timer = new Timer("CancelJobHandler", true);
long period = 1000 * 60 * 5; // five minute delay between successive task executions.
timer.schedule(new CancelTimerTask(), 0 , period);
+ cachedThreadPool = GFacThreadPoolExecutor.getCachedThreadPool();
}
private void populateAddressAndParserMap(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException {
@@ -133,6 +136,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
public void stopMonitor(String jobId, boolean runOutflow) {
TaskContext taskContext = jobMonitorMap.remove(jobId);
monitorQueueSize.record(jobMonitorMap.size());
+ GFacThreadPoolExecutor.record();
if (taskContext != null && runOutflow) {
try {
ProcessContext pc = taskContext.getParentProcessContext();
@@ -150,12 +154,13 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
pStatus.setReason("Job cancelled");
pc.setProcessStatus(pStatus);
GFacUtils.saveAndPublishProcessStatus(pc);
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(pc));
+ cachedThreadPool.execute(new GFacWorker(pc));
} catch (GFacException e) {
log.info("[EJM]: Error while running output tasks", e);
}
}
- }
+ GFacThreadPoolExecutor.record();
+ }
@Override
public boolean isMonitoring(String jobId) {
@@ -338,6 +343,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
}
private void process(JobStatusResult jobStatusResult, TaskContext taskContext){
+ GFacThreadPoolExecutor.record();
canceledJobs.remove(jobStatusResult.getJobId());
cancelledJobs.record(canceledJobs.size());
JobState resultState = jobStatusResult.getState();
@@ -413,11 +419,12 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
parentProcessContext.setProcessStatus(processStatus);
GFacUtils.saveAndPublishProcessStatus(parentProcessContext);
}
- GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(parentProcessContext));
+ cachedThreadPool.execute(new GFacWorker(parentProcessContext));
} catch (GFacException e) {
log.info("[EJM]: Error while running output tasks", e);
}
}
+ GFacThreadPoolExecutor.record();
}
private void writeEnvelopeOnError(Message m) throws MessagingException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 086093c..035543e 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -21,7 +21,8 @@
package org.apache.airavata.gfac.server;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.Histogram;
+import kamon.metric.instrument.MinMaxCounter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -62,6 +63,7 @@ import org.apache.thrift.TException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
+import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -73,11 +75,13 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static sun.java2d.Disposer.getQueue;
public class GfacServerHandler implements GfacService.Iface {
private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
private Subscriber processLaunchSubscriber;
- private static int requestCount=0;
private ExperimentCatalog experimentCatalog;
private AppCatalog appCatalog;
private String airavataUserName;
@@ -87,7 +91,11 @@ public class GfacServerHandler implements GfacService.Iface {
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private ExecutorService executorService;
- private Counter consumedCount = Kamon.metrics().counter(String.format("%s.consumed-count", getClass().getCanonicalName()));
+ private MinMaxCounter consumedCount = Kamon.metrics().minMaxCounter(String.format("%s.consumed-count", getClass().getName()));
+ private Histogram threadPoolQueueSize = Kamon.metrics().histogram(String.format("%s.queue-size", getClass().getName()));
+ private Histogram threadPoolActiveThreads = Kamon.metrics().histogram(String.format("%s.active-threads", getClass().getName()));
+ private Histogram threadPoolTotalThreads = Kamon.metrics().histogram(String.format("%s.total-threads", getClass().getName()));
+
public GfacServerHandler() throws AiravataStartupException {
try {
@@ -163,7 +171,8 @@ public class GfacServerHandler implements GfacService.Iface {
MDC.put(MDCConstants.GATEWAY_ID, gatewayId);
MDC.put(MDCConstants.TOKEN_ID, tokenId);
try {
- executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId)));
+ recordThreadPool();
+ executorService.execute(MDCUtil.wrapWithMDC(new GFacWorker(processId, gatewayId, tokenId)));
} catch (GFacException e) {
log.error("Failed to submit process", e);
@@ -174,6 +183,12 @@ public class GfacServerHandler implements GfacService.Iface {
return true;
}
+ private void recordThreadPool() {
+ threadPoolQueueSize.record(((ThreadPoolExecutor)executorService).getQueue().size());
+ threadPoolActiveThreads.record(((ThreadPoolExecutor)executorService).getActiveCount());
+ threadPoolTotalThreads.record(((ThreadPoolExecutor)executorService).getPoolSize());
+ }
+
@Override
public boolean cancelProcess(String processId, String gatewayId, String tokenId) throws TException {
return false;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 38f4e97..5d2a353 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -21,7 +21,7 @@
package org.apache.airavata.orchestrator.core.impl;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b617016..bde61c9 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -22,7 +22,7 @@
package org.apache.airavata.orchestrator.server;
import kamon.Kamon;
-import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.MinMaxCounter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logging.MDCConstants;
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.*;
+import java.util.concurrent.ExecutorService;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -87,15 +88,16 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private final Subscriber statusSubscribe;
private final Subscriber experimentSubscriber;
private CuratorFramework curatorClient;
- private Counter publishCount = Kamon.metrics().counter(String.format("%s.publish-count", getClass().getCanonicalName()));
- private Counter publishFail = Kamon.metrics().counter(String.format("%s.publish-fail-count", getClass().getCanonicalName()));
- private Counter processConsumeCount = Kamon.metrics().counter(String.format("%s.process.consume-count", getClass().getCanonicalName()));
- private Counter experimentConsumeCount = Kamon.metrics().counter(String.format("%s.experiment.consume-count", getClass().getCanonicalName()));
- private Counter experimentLaunchConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_launch.consume-count", getClass().getCanonicalName()));
- private Counter experimentCancelConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.consume-count", getClass().getCanonicalName()));
- private Counter unsupportedMessageCount = Kamon.metrics().counter(String.format("%s.unsupported-count", getClass().getCanonicalName()));
+ private MinMaxCounter publishCount = Kamon.metrics().minMaxCounter(String.format("%s.publish-count", getClass().getName()));
+ private MinMaxCounter publishFail = Kamon.metrics().minMaxCounter(String.format("%s.publish-fail-count", getClass().getName()));
+ private MinMaxCounter processConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.process.consume-count", getClass().getName()));
+ private MinMaxCounter experimentConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment.consume-count", getClass().getName()));
+ private MinMaxCounter experimentLaunchConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_launch.consume-count", getClass().getName()));
+ private MinMaxCounter experimentCancelConsumeCount = Kamon.metrics().minMaxCounter(String.format("%s.experiment_cancel.consume-count", getClass().getName()));
+ private MinMaxCounter unsupportedMessageCount = Kamon.metrics().minMaxCounter(String.format("%s.unsupported-count", getClass().getName()));
+ private ExecutorService cachedThreadPool;
- /**
+ /**
* Query orchestrator server to fetch the CPI version
*/
public String getOrchestratorCPIVersion() throws TException {
@@ -109,6 +111,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName());
experimentSubscriber = MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, Type.EXPERIMENT_LAUNCH);
setAiravataUserName(ServerSettings.getDefaultUser());
+ cachedThreadPool = OrchestratorServerThreadPoolExecutor.getCachedThreadPool();
} catch (AiravataException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -144,7 +147,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
*/
public boolean launchExperiment(String experimentId, String gatewayId) throws TException {
ExperimentModel experiment = null;
- try {
+ OrchestratorServerThreadPoolExecutor.record();
+ try {
String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId);
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath);
String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
@@ -237,7 +241,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
log.info("expId: {}, Launched experiment ", experimentId);
- OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId)));
+ cachedThreadPool.execute(MDCUtil.wrapWithMDC(new SingleAppExperimentRunner(experimentId, token, gatewayId)));
} else if (executionType == ExperimentType.WORKFLOW) {
//its a workflow execution experiment
log.debug(experimentId, "Launching workflow experiment {}.", experimentId);
@@ -249,6 +253,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
} catch (Exception e) {
throw new TException("Experiment '" + experimentId + "' launch failed. Unable to figureout execution type for application " + experiment.getExecutionId(), e);
}
+ OrchestratorServerThreadPoolExecutor.record();
return true;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
index 3fdba74..a57419c 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorServerThreadPoolExecutor.java
@@ -23,20 +23,27 @@ package org.apache.airavata.orchestrator.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import kamon.Kamon;
+import kamon.metric.instrument.Histogram;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OrchestratorServerThreadPoolExecutor {
- private final static Logger logger = LoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class);
- public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = "airavata.server.thread.pool.size";
+ private final static Logger logger = LoggerFactory.getLogger(OrchestratorServerThreadPoolExecutor.class);
+ public static final String AIRAVATA_SERVER_THREAD_POOL_SIZE = "airavata.server.thread.pool.size";
- private static ExecutorService threadPool;
+ private static ExecutorService threadPool;
+ private static Histogram threadPoolQueueSize = Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.queue-size");
+ private static Histogram threadPoolActiveThreads = Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.active-threads");
+ private static Histogram threadPoolTotalThreads = Kamon.metrics().histogram("OrchestratorServerThreadPoolExecutor.total-threads");
- public static ExecutorService getCachedThreadPool() {
+
+ public static ExecutorService getCachedThreadPool() {
if(threadPool ==null){
threadPool = Executors.newCachedThreadPool();
}
@@ -53,4 +60,10 @@ public class OrchestratorServerThreadPoolExecutor {
}
return threadPool;
}
+
+ public static void record() {
+ threadPoolQueueSize.record(((ThreadPoolExecutor)threadPool).getQueue().size());
+ threadPoolActiveThreads.record(((ThreadPoolExecutor)threadPool).getActiveCount());
+ threadPoolTotalThreads.record(((ThreadPoolExecutor)threadPool).getPoolSize());
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b220509c/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 8660974..7bce436 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -222,6 +222,7 @@ public class ServerMain {
}
}
if (hasStopRequested()){
+ Kamon.shutdown();
ServerSettings.setStopAllThreads(true);
stopAllServers();
ShutdownStrategy shutdownStrategy;