You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2016/10/01 20:09:26 UTC
[1/2] airavata git commit: [AIRAVATA-2107] Introduce metrics to
Airavata
Repository: airavata
Updated Branches:
refs/heads/lahiru/AIRAVATA-2017 [created] 97247e39a
[AIRAVATA-2107] Introduce metrics to Airavata
1. Add kamon.io as a metrics library to capture metrices.
2. Add sample metrices to Airavata
3. Push calculated metrices to datadog as a stats viewer.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/de75aa9b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/de75aa9b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/de75aa9b
Branch: refs/heads/lahiru/AIRAVATA-2017
Commit: de75aa9be20348135a61371356d3f42d5b9a81f0
Parents: 8eea17f
Author: Lahiru Ginnaliya Gamathige <la...@apache.org>
Authored: Mon Sep 19 23:06:20 2016 -0700
Committer: Lahiru Ginnaliya Gamathige <la...@apache.org>
Committed: Sat Oct 1 09:24:43 2016 -0700
----------------------------------------------------------------------
modules/distribution/pom.xml | 10 ++++++++
modules/gfac/gfac-core/pom.xml | 4 +++
modules/gfac/gfac-impl/pom.xml | 4 +++
.../airavata/gfac/impl/HPCRemoteCluster.java | 21 +++++++++++++--
.../org/apache/airavata/gfac/impl/SSHUtils.java | 15 +++++++++++
.../gfac/monitor/email/EmailBasedMonitor.java | 21 ++++++++++++---
modules/orchestrator/orchestrator-core/pom.xml | 4 +++
modules/registry/registry-core/pom.xml | 27 ++++++++++++++++++++
modules/server/pom.xml | 4 +++
.../org/apache/airavata/server/ServerMain.java | 5 +++-
pom.xml | 11 ++++++++
11 files changed, 120 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml
index d096739..296c60c 100644
--- a/modules/distribution/pom.xml
+++ b/modules/distribution/pom.xml
@@ -567,6 +567,16 @@
<artifactId>airavata-client-samples</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ <version>${kamon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-datadog_2.11</artifactId>
+ <version>${kamon.version}</version>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 8d358ff..8e1329d 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -128,6 +128,10 @@
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/pom.xml b/modules/gfac/gfac-impl/pom.xml
index 2a0a949..ba512cd 100644
--- a/modules/gfac/gfac-impl/pom.xml
+++ b/modules/gfac/gfac-impl/pom.xml
@@ -122,5 +122,9 @@
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
index 725b6d0..9c97d37 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/HPCRemoteCluster.java
@@ -25,6 +25,8 @@ import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.UserInfo;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
import org.apache.airavata.gfac.core.SSHApiException;
@@ -51,6 +53,14 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
private final SSHKeyAuthentication authentication;
private final JSch jSch;
private Session session;
+ private Counter submittedJobCount = Kamon.metrics().counter(String.format("%s.submitted-jobs", getClass().getCanonicalName()));
+ private Counter nonZeroExitCodeJobCount = Kamon.metrics().counter(String.format("%s.nonzero-exit-jobs", getClass().getCanonicalName()));
+ private Counter emptyJobIdCount = Kamon.metrics().counter(String.format("%s.empty-jobid-jobs", getClass().getCanonicalName()));
+ private Counter copyToFailCount = Kamon.metrics().counter(String.format("%s.copyTo-fail", getClass().getCanonicalName()));
+ private Counter copyFromFailCount = Kamon.metrics().counter(String.format("%s.copyFrom-fail", getClass().getCanonicalName()));
+ private Counter mkDirFailCount = Kamon.metrics().counter(String.format("%s.mkDir-fail", getClass().getCanonicalName()));
+ private Counter listFailCount = Kamon.metrics().counter(String.format("%s.list-fail", getClass().getCanonicalName()));
+
public HPCRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration, AuthenticationInfo
authenticationInfo) throws AiravataException {
@@ -90,6 +100,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
submitCommand.setRawCommand("cd " + workingDirectory + "; " + submitCommand.getRawCommand());
StandardOutReader reader = new StandardOutReader();
executeCommand(submitCommand, reader);
+ submittedJobCount.increment();
// throwExceptionOnError(reader, submitCommand);
jsoutput.setJobId(outputParser.parseJobSubmission(reader.getStdOutputString()));
if (jsoutput.getJobId() == null) {
@@ -97,6 +108,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
jsoutput.setJobSubmissionFailed(true);
jsoutput.setFailureReason("stdout : " + reader.getStdOutputString() +
"\n stderr : " + reader.getStdErrorString());
+ emptyJobIdCount.increment();
}
}
jsoutput.setExitCode(reader.getExitCode());
@@ -104,6 +116,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
jsoutput.setJobSubmissionFailed(true);
jsoutput.setFailureReason("stdout : " + reader.getStdOutputString() +
"\n stderr : " + reader.getStdErrorString());
+ nonZeroExitCodeJobCount.increment();
}
jsoutput.setStdOut(reader.getStdOutputString());
jsoutput.setStdErr(reader.getStdErrorString());
@@ -120,6 +133,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
SSHUtils.scpTo(localFile, remoteFile, session);
retry = 0;
} catch (Exception e) {
+ copyToFailCount.increment();
retry--;
try {
session = Factory.getSSHSession(authenticationInfo, serverInfo);
@@ -147,6 +161,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
SSHUtils.scpFrom(remoteFile, localFile, session);
retry=0;
} catch (Exception e) {
+ copyFromFailCount.increment();
retry--;
try {
session = Factory.getSSHSession(authenticationInfo, serverInfo);
@@ -205,6 +220,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
SSHUtils.makeDirectory(directoryPath, session);
break; // Exit while loop
} catch (JSchException e) {
+ mkDirFailCount.increment();
if (retryCount == MAX_RETRY_COUNT) {
log.error("Retry count " + MAX_RETRY_COUNT + " exceeded for creating directory: "
+ serverInfo.getHost() + ":" + directoryPath, e);
@@ -263,6 +279,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
log.info("Creating directory: " + serverInfo.getHost() + ":" + directoryPath);
return SSHUtils.listDirectory(directoryPath, session);
} catch (JSchException | AiravataException | IOException e) {
+ listFailCount.increment();
throw new SSHApiException("Failed to list directory " + serverInfo.getHost() + ":" + directoryPath, e);
}
}
@@ -302,7 +319,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
// noting to do
}else if ((stdErrorString.contains(command.trim()) && !stdErrorString.contains("Warning")) || stdErrorString
.contains("error")) {
- log.error("Command {} , Standard Error output {}", command, stdErrorString);
+ log.error(String.format("Command %s , Standard Error output %s", command, stdErrorString));
throw new SSHApiException("Error running command " + command + " on remote cluster. StandardError: " +
stdErrorString);
}
@@ -322,7 +339,7 @@ public class HPCRemoteCluster extends AbstractRemoteCluster{
channelExec.setInputStream(null);
channelExec.setErrStream(commandOutput.getStandardError());
channelExec.connect();
- log.info("Executing command {}", commandInfo.getCommand());
+ log.info(String.format("Executing command %s", commandInfo.getCommand()));
commandOutput.onOutput(channelExec);
break; // exit from while loop
} catch (JSchException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
index cd5651e..2f59828 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/SSHUtils.java
@@ -24,6 +24,9 @@ import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.Histogram;
import org.apache.airavata.gfac.core.SSHApiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +45,11 @@ import java.util.List;
*/
public class SSHUtils {
private static final Logger log = LoggerFactory.getLogger(SSHUtils.class);
+ private static Histogram scpToBytes = Kamon.metrics().histogram(String.format("%s.scpTo-bytes", SSHUtils.class.getCanonicalName()));
+ private static Counter scpToFailedCount = Kamon.metrics().counter(String.format("%s.scpTo-fail", SSHUtils.class.getCanonicalName()));
+ private static Histogram scpFromBytes = Kamon.metrics().histogram(String.format("%s.scpFrom-bytes", SSHUtils.class.getCanonicalName()));
+ private static Counter scpFromFailedCount = Kamon.metrics().counter(String.format("%s.scpFrom-fail", SSHUtils.class.getCanonicalName()));
/**
* This will copy a local file to a remote location
@@ -79,6 +86,7 @@ public class SSHUtils {
if (checkAck(in) != 0) {
String error = "Error Reading input Stream";
log.error(error);
+ scpToFailedCount.increment();
throw new SSHApiException(error);
}
@@ -94,6 +102,7 @@ public class SSHUtils {
if (checkAck(in) != 0) {
String error = "Error Reading input Stream";
log.error(error);
+ scpToFailedCount.increment();
throw new SSHApiException(error);
}
}
@@ -112,6 +121,7 @@ public class SSHUtils {
if (checkAck(in) != 0) {
String error = "Error Reading input Stream";
log.error(error);
+ scpToFailedCount.increment();
throw new SSHApiException(error);
}
@@ -122,6 +132,7 @@ public class SSHUtils {
int len = fis.read(buf, 0, buf.length);
if (len <= 0) break;
out.write(buf, 0, len); //out.flush();
+ scpToBytes.record(command.getBytes().length);
}
fis.close();
fis = null;
@@ -131,6 +142,7 @@ public class SSHUtils {
out.flush();
if (checkAck(in) != 0) {
String error = "Error Reading input Stream";
+ scpToFailedCount.increment();
log.error(error);
throw new SSHApiException(error);
}
@@ -140,6 +152,7 @@ public class SSHUtils {
channel.disconnect();
if (stdOutReader.getStdErrorString().contains("scp:")) {
+ scpToFailedCount.increment();
throw new SSHApiException(stdOutReader.getStdErrorString());
}
//since remote file is always a file we just return the file
@@ -232,6 +245,7 @@ public class SSHUtils {
}
fos.write(buf, 0, foo);
filesize -= foo;
+ scpFromBytes.record(foo);
if (filesize == 0L) break;
}
fos.close();
@@ -254,6 +268,7 @@ public class SSHUtils {
}
} catch (Exception e) {
+ scpFromFailedCount.increment();
log.error(e.getMessage(), e);
} finally {
try {
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index bbcd635..d1afdd6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -20,6 +20,9 @@
*/
package org.apache.airavata.gfac.monitor.email;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
+import kamon.metric.instrument.Histogram;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
@@ -71,12 +74,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
private Message[] flushUnseenMessages;
private Map<String, Boolean> canceledJobs = new ConcurrentHashMap<>();
private Timer timer;
-
+ private Histogram monitorQueueSize = Kamon.metrics().histogram(String.format("%s.monitor-queue-size", getClass().getCanonicalName()));
+ private Histogram cancelledJobs = Kamon.metrics().histogram(String.format("%s.cancelled-jobs", getClass().getCanonicalName()));
+ private Counter completedJobCount = Kamon.metrics().counter(String.format("%s.completed-jobs", getClass().getCanonicalName()));
+ private Counter failedJobCount = Kamon.metrics().counter(String.format("%s.failed-jobs", getClass().getCanonicalName()));
public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException {
init();
populateAddressAndParserMap(resourceConfigs);
- }
+ }
private void init() throws AiravataException {
host = ServerSettings.getEmailBasedMonitorHost();
@@ -119,13 +125,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
public void monitor(String jobId, TaskContext taskContext) {
log.info("[EJM]: Added monitor Id : {} to email based monitor map", jobId);
jobMonitorMap.put(jobId, taskContext);
+ monitorQueueSize.record(jobMonitorMap.size());
taskContext.getParentProcessContext().setPauseTaskExecution(true);
}
@Override
public void stopMonitor(String jobId, boolean runOutflow) {
TaskContext taskContext = jobMonitorMap.remove(jobId);
- if (taskContext != null && runOutflow) {
+ monitorQueueSize.record(jobMonitorMap.size());
+ if (taskContext != null && runOutflow) {
try {
ProcessContext pc = taskContext.getParentProcessContext();
if (taskContext.isCancel()) {
@@ -157,6 +165,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
@Override
public void canceledJob(String jobId) {
canceledJobs.put(jobId, Boolean.FALSE);
+ cancelledJobs.record(canceledJobs.size());
}
private JobStatusResult parse(Message message) throws MessagingException, AiravataException {
@@ -330,6 +339,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
private void process(JobStatusResult jobStatusResult, TaskContext taskContext){
canceledJobs.remove(jobStatusResult.getJobId());
+ cancelledJobs.record(canceledJobs.size());
JobState resultState = jobStatusResult.getState();
// TODO : update job state on process context
boolean runOutflowTasks = false;
@@ -340,6 +350,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
// TODO - Handle all other valid JobStates
if (resultState == JobState.COMPLETE) {
jobMonitorMap.remove(jobStatusResult.getJobId());
+ monitorQueueSize.record(jobMonitorMap.size());
+ completedJobCount.increment();
jobStatus.setJobState(JobState.COMPLETE);
jobStatus.setReason("Complete email received");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
@@ -359,6 +371,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
log.info("[EJM]: Job Active email received, " + jobDetails);
}else if (resultState == JobState.FAILED) {
jobMonitorMap.remove(jobStatusResult.getJobId());
+ monitorQueueSize.record(jobMonitorMap.size());
+ failedJobCount.increment();
runOutflowTasks = true;
jobStatus.setJobState(JobState.FAILED);
jobStatus.setReason("Failed email received");
@@ -366,6 +380,7 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{
log.info("[EJM]: Job failed email received , removed job from job monitoring. " + jobDetails);
}else if (resultState == JobState.CANCELED) {
jobMonitorMap.remove(jobStatusResult.getJobId());
+ monitorQueueSize.record(jobMonitorMap.size());
jobStatus.setJobState(JobState.CANCELED);
jobStatus.setReason("Canceled email received");
jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index ee0d23a..8e21b6d 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -98,6 +98,10 @@ the License. -->
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/registry/registry-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/registry/registry-core/pom.xml b/modules/registry/registry-core/pom.xml
index c2b26b2..8aa80c1 100644
--- a/modules/registry/registry-core/pom.xml
+++ b/modules/registry/registry-core/pom.xml
@@ -102,6 +102,33 @@
<build>
<plugins>
+ <!--plugin>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-maven-plugin</artifactId>
+ <version>2.2.0</version>
+ <configuration>
+ <includes>**/entities/*.class</includes>
+ <excludes>**/entities/XML*.class</excludes>
+ <addDefaultConstructor>true</addDefaultConstructor>
+ <enforcePropertyRestrictions>true</enforcePropertyRestrictions>
+ </configuration>
+ <executions>
+ <execution>
+ <id>enhancer</id>
+ <phase>process-classes</phase>
+ <goals>
+ <goal>enhance</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ </dependencies>
+ </plugin-->
<!--<plugin>-->
<!--<groupId>org.apache.maven.plugins</groupId>-->
<!--<artifactId>maven-antrun-plugin</artifactId>-->
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/server/pom.xml b/modules/server/pom.xml
index d306c83..8c4b060 100644
--- a/modules/server/pom.xml
+++ b/modules/server/pom.xml
@@ -60,5 +60,9 @@
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 1c0483d..8660974 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -23,6 +23,7 @@ package org.apache.airavata.server;
import ch.qos.logback.classic.LoggerContext;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.common.exception.AiravataException;
+import kamon.Kamon;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logging.kafka.KafkaAppender;
import org.apache.airavata.common.utils.*;
@@ -44,6 +45,7 @@ import java.util.Arrays;
import java.util.List;
public class ServerMain {
+
private static List<IServer> servers;
private static final String SERVERS_KEY="servers";
private final static Logger logger = LoggerFactory.getLogger(ServerMain.class);
@@ -160,6 +162,7 @@ public class ServerMain {
// }
public static void main(String args[]) throws ParseException, IOException, AiravataException {
+ Kamon.start();
ServerSettings.mergeSettingsCommandLineArgs(args);
ServerSettings.setServerRoles(ApplicationSettings.getSetting(SERVERS_KEY, "all").split(","));
@@ -423,4 +426,4 @@ public class ServerMain {
return -1;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/de75aa9b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a65df09..bf5f592 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<maven.replacer.plugin.version>1.5.3</maven.replacer.plugin.version>
<kafka-clients.version>0.8.2.2</kafka-clients.version>
<logback.version>1.1.6</logback.version>
+ <kamon.version>0.6.0</kamon.version>
</properties>
<developers>
@@ -446,6 +447,16 @@
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ <version>${kamon.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-datadog_2.11</artifactId>
+ <version>${kamon.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
[2/2] airavata git commit: More metrices
Posted by la...@apache.org.
More metrices
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/97247e39
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/97247e39
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/97247e39
Branch: refs/heads/lahiru/AIRAVATA-2017
Commit: 97247e39ac15357ba7038e3ed484946a238a8c63
Parents: de75aa9
Author: Lahiru Ginnaliya Gamathige <la...@apache.org>
Authored: Sat Oct 1 13:07:52 2016 -0700
Committer: Lahiru Ginnaliya Gamathige <la...@apache.org>
Committed: Sat Oct 1 13:07:52 2016 -0700
----------------------------------------------------------------------
airavata-api/airavata-api-server/pom.xml | 4 ++++
.../server/handler/AiravataServerHandler.java | 9 ++++++++
.../airavata/gfac/server/GfacServerHandler.java | 4 ++++
.../core/impl/GFACPassiveJobSubmitter.java | 2 ++
.../server/OrchestratorServerHandler.java | 22 ++++++++++++++++++--
5 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index e5b549c..cef9910 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -141,6 +141,10 @@
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
+ <dependency>
+ <groupId>io.kamon</groupId>
+ <artifactId>kamon-core_2.11</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 5ccf874..b9b6d03 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -21,6 +21,8 @@
package org.apache.airavata.api.server.handler;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.api.airavata_apiConstants;
import org.apache.airavata.api.server.security.interceptor.SecurityCheck;
@@ -91,6 +93,9 @@ public class AiravataServerHandler implements Airavata.Iface {
private Publisher statusPublisher;
private Publisher experimentPublisher;
private CredentialStoreService.Client csClient;
+ private Counter experimentPublishCount = Kamon.metrics().counter(String.format("%s.experiment.publish-count", getClass().getCanonicalName()));
+ private Counter experimentLaunchPublishCount = Kamon.metrics().counter(String.format("%s.experiment_launch.publish-count", getClass().getCanonicalName()));
+ private Counter experimentCancelPublishCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.publish-count", getClass().getCanonicalName()));
public AiravataServerHandler() {
try {
@@ -3645,6 +3650,8 @@ public class AiravataServerHandler implements Airavata.Iface {
MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, "LAUNCH.EXP-" + UUID.randomUUID().toString(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
experimentPublisher.publish(messageContext);
+ experimentPublishCount.increment();
+ experimentLaunchPublishCount.increment();
}
private void submitCancelExperiment(String gatewayId, String experimentId) throws AiravataException {
@@ -3652,6 +3659,8 @@ public class AiravataServerHandler implements Airavata.Iface {
MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT_CANCEL, "CANCEL.EXP-" + UUID.randomUUID().toString(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
experimentPublisher.publish(messageContext);
+ experimentPublishCount.increment();
+ experimentCancelPublishCount.increment();
}
private CredentialStoreService.Client getCredentialStoreServiceClient() throws TException, ApplicationSettingsException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 8e285e5..086093c 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.airavata.gfac.server;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.AiravataStartupException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -85,6 +87,7 @@ public class GfacServerHandler implements GfacService.Iface {
private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
private ExecutorService executorService;
+ private Counter consumedCount = Kamon.metrics().counter(String.format("%s.consumed-count", getClass().getCanonicalName()));
public GfacServerHandler() throws AiravataStartupException {
try {
@@ -188,6 +191,7 @@ public class GfacServerHandler implements GfacService.Iface {
public void onMessage(MessageContext messageContext) {
MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId());
+ consumedCount.increment();
log.info(" Message Received with message id {} and with message type: {}" + messageContext.getMessageId(), messageContext.getType());
if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) {
ProcessStatus status = new ProcessStatus();
http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 3438475..38f4e97 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -20,6 +20,8 @@
*/
package org.apache.airavata.orchestrator.core.impl;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
http://git-wip-us.apache.org/repos/asf/airavata/blob/97247e39/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index df5865e..b617016 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -21,6 +21,8 @@
package org.apache.airavata.orchestrator.server;
+import kamon.Kamon;
+import kamon.metric.instrument.Counter;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logging.MDCConstants;
@@ -85,6 +87,13 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private final Subscriber statusSubscribe;
private final Subscriber experimentSubscriber;
private CuratorFramework curatorClient;
+ private Counter publishCount = Kamon.metrics().counter(String.format("%s.publish-count", getClass().getCanonicalName()));
+ private Counter publishFail = Kamon.metrics().counter(String.format("%s.publish-fail-count", getClass().getCanonicalName()));
+ private Counter processConsumeCount = Kamon.metrics().counter(String.format("%s.process.consume-count", getClass().getCanonicalName()));
+ private Counter experimentConsumeCount = Kamon.metrics().counter(String.format("%s.experiment.consume-count", getClass().getCanonicalName()));
+ private Counter experimentLaunchConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_launch.consume-count", getClass().getCanonicalName()));
+ private Counter experimentCancelConsumeCount = Kamon.metrics().counter(String.format("%s.experiment_cancel.consume-count", getClass().getCanonicalName()));
+ private Counter unsupportedMessageCount = Kamon.metrics().counter(String.format("%s.unsupported-count", getClass().getCanonicalName()));
/**
* Query orchestrator server to fetch the CPI version
@@ -457,7 +466,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS,
AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId);
for (String processId : processIds) {
- launchProcess(processId, airavataCredStoreToken, gatewayId);
+ if (launchProcess(processId, airavataCredStoreToken, gatewayId)) {
+ publishCount.increment();
+ } else {
+ publishFail.increment();
+ }
}
// ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
// status.setReason("submitted all processes");
@@ -492,6 +505,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
@Override
public void onMessage(MessageContext message) {
if (message.getType().equals(MessageType.PROCESS)) {
+ processConsumeCount.increment();
try {
ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
TBase event = message.getEvent();
@@ -583,7 +597,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
"Error" + " while prcessing process status change event");
}
} else {
- System.out.println("Message Recieved with message id " + message.getMessageId() + " and with message " +
+ log.info("Message Recieved with message id " + message.getMessageId() + " and with message " +
"type " + message.getType().name());
}
}
@@ -595,14 +609,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
@Override
public void onMessage(MessageContext messageContext) {
MDC.put(MDCConstants.GATEWAY_ID, messageContext.getGatewayId());
+ experimentConsumeCount.increment();
switch (messageContext.getType()) {
case EXPERIMENT:
+ experimentLaunchConsumeCount.increment();
launchExperiment(messageContext);
break;
case EXPERIMENT_CANCEL:
+ experimentCancelConsumeCount.increment();
cancelExperiment(messageContext);
break;
default:
+ unsupportedMessageCount.increment();
experimentSubscriber.sendAck(messageContext.getDeliveryTag());
log.error("Orchestrator got un-support message type : " + messageContext.getType());
break;