You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/07/12 03:33:26 UTC
[2/2] incubator-eagle git commit: [EAGLE-350] Running queue metrics
monitoring
[EAGLE-350] Running queue metrics monitoring
https://issues.apache.org/jira/browse/EAGLE-350
Author: Zhao, Qingwen <qi...@ebay.com>
Closes #260 from qingwen220/EAGLE-350.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9f6fea4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9f6fea4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9f6fea4a
Branch: refs/heads/develop
Commit: 9f6fea4ab65f4e7386d01b936b3a5d913ebee7e6
Parents: f3e7687
Author: Zhao, Qingwen <qi...@ebay.com>
Authored: Tue Jul 12 11:32:57 2016 +0800
Committer: Zhao, Qingwen <qi...@ebay.com>
Committed: Tue Jul 12 11:32:57 2016 +0800
----------------------------------------------------------------------
eagle-assembly/src/main/bin/eagle-env.sh | 2 +-
.../src/test/resources/log4j.properties | 2 +-
eagle-jpm/eagle-hadoop-queue/pom.xml | 90 ++++++++
.../assembly/eagle-running-queue-assembly.xml | 66 ++++++
.../hadoop/queue/HadoopQueueRunningMain.java | 93 ++++++++
.../queue/common/HadoopClusterConstants.java | 75 +++++++
.../queue/common/HadoopYarnResourceUtils.java | 60 +++++
.../common/YarnClusterResourceURLBuilder.java | 43 ++++
.../queue/common/YarnURLSelectorImpl.java | 34 +++
.../queue/crawler/ClusterMetricsCrawler.java | 65 ++++++
.../crawler/ClusterMetricsParseListener.java | 155 +++++++++++++
.../queue/crawler/RunningAppParseListener.java | 125 +++++++++++
.../queue/crawler/RunningAppsCrawler.java | 66 ++++++
.../queue/crawler/SchedulerInfoCrawler.java | 66 ++++++
.../crawler/SchedulerInfoParseListener.java | 148 ++++++++++++
.../exceptions/HadoopQueueFetcherException.java | 60 +++++
.../model/HadoopQueueEntityRepository.java | 27 +++
.../hadoop/queue/model/applications/App.java | 222 ++++++++++++++++++
.../hadoop/queue/model/applications/Apps.java | 41 ++++
.../queue/model/applications/AppsWrapper.java | 36 +++
.../model/clusterMetrics/ClusterMetrics.java | 198 ++++++++++++++++
.../clusterMetrics/ClusterMetricsWrapper.java | 37 +++
.../hadoop/queue/model/scheduler/Queue.java | 224 +++++++++++++++++++
.../hadoop/queue/model/scheduler/Queues.java | 38 ++++
.../queue/model/scheduler/ResourcesUsed.java | 48 ++++
.../model/scheduler/RunningQueueAPIEntity.java | 158 +++++++++++++
.../hadoop/queue/model/scheduler/Scheduler.java | 36 +++
.../queue/model/scheduler/SchedulerInfo.java | 83 +++++++
.../queue/model/scheduler/SchedulerWrapper.java | 36 +++
.../hadoop/queue/model/scheduler/User.java | 63 ++++++
.../queue/model/scheduler/UserWrapper.java | 76 +++++++
.../hadoop/queue/model/scheduler/Users.java | 38 ++++
.../queue/storm/HadoopQueueMessageId.java | 57 +++++
.../storm/HadoopQueueMetricPersistBolt.java | 108 +++++++++
.../storm/HadoopQueueRunningExtractor.java | 95 ++++++++
.../queue/storm/HadoopQueueRunningSpout.java | 73 ++++++
.../src/main/resources/application.conf | 36 +++
.../src/main/resources/log4j.properties | 34 +++
.../hadoop/queue/TestClusterMetricsCrawler.java | 39 ++++
.../queue/TestHadoopYarnResourceUtils.java | 54 +++++
.../hadoop/queue/TestRunningAppsCrawler.java | 39 ++++
.../hadoop/queue/TestSchedulerInfoCrawler.java | 39 ++++
.../src/test/resources/application-bak.conf | 36 +++
.../src/test/resources/application.conf | 36 +++
.../src/test/resources/log4j.properties | 34 +++
.../resourceFetch/ha/AbstractURLSelector.java | 100 +++++++++
eagle-jpm/pom.xml | 3 +-
eagle-topology-assembly/pom.xml | 5 +
pom.xml | 1 +
49 files changed, 3297 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-assembly/src/main/bin/eagle-env.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-env.sh b/eagle-assembly/src/main/bin/eagle-env.sh
index 66d72ae..c6de7e0 100755
--- a/eagle-assembly/src/main/bin/eagle-env.sh
+++ b/eagle-assembly/src/main/bin/eagle-env.sh
@@ -52,4 +52,4 @@ done
# EAGLE_TABLE_LIST
# TODO: Automatically create hbase table when initializing
-export EAGLE_TABLE_LIST='alertdef ipzone streamMetadata alertdetail fileSensitivity eaglehdfs_alert streamdef eagle_metric alertExecutor alertStream alertStreamSchema hiveResourceSensitivity hbaseResourceSensitivity mlmodel userprofile hfdsusercommandpattern appCommand appDefinition serviceAudit aggregatedef alertNotifications eagleSiteDesc eagleSiteApplication eagleApplicationDesc eagleFeatureDesc eagle_metadata'
\ No newline at end of file
+export EAGLE_TABLE_LIST='alertdef ipzone streamMetadata alertdetail fileSensitivity eaglehdfs_alert streamdef eagle_metric alertExecutor alertStream alertStreamSchema hiveResourceSensitivity hbaseResourceSensitivity mlmodel userprofile hfdsusercommandpattern appCommand appDefinition serviceAudit aggregatedef alertNotifications eagleSiteDesc eagleSiteApplication eagleApplicationDesc eagleFeatureDesc eagle_metadata running_queue'
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties b/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
index d59ded6..fb13ad5 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=DEBUG, stdout
# standard output
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/pom.xml b/eagle-jpm/eagle-hadoop-queue/pom.xml
new file mode 100644
index 0000000..6566bf5
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-parent</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-hadoop-queue</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-util</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.wso2.orbit.com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptor>src/assembly/eagle-running-queue-assembly.xml</descriptor>
+ <finalName>eagle-running-queue-${project.version}</finalName>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <tarLongFileMode>posix</tarLongFileMode>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml b/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml
new file mode 100644
index 0000000..244c5f0
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>assembly</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>false</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/application.conf</exclude>
+ <exclude>**/defaults.yaml</exclude>
+ <exclude>**/*storm.yaml</exclude>
+ <exclude>**/*storm.yaml.1</exclude>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </unpackOptions>
+ <excludes>
+ <exclude>org.apache.storm:storm-core</exclude>
+ <exclude>org.slf4j:slf4j-api</exclude>
+ <exclude>org.slf4j:log4j-over-slf4j</exclude>
+ <exclude>org.slf4j:slf4j-log4j12</exclude>
+ <exclude>log4j:log4j</exclude>
+ <exclude>asm:asm</exclude>
+ <exclude>org.apache.log4j.wso2:log4j</exclude>
+ <exclude>log4j:apache-log4j-extras</exclude>
+ <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.outputDirectory}/</directory>
+ <outputDirectory>/</outputDirectory>
+ <excludes>
+ <exclude>**/application.conf</exclude>
+ <exclude>**/log4j.properties</exclude>
+ <exclude>**/*.yaml</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
new file mode 100644
index 0000000..a285a6b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HadoopQueueRunningMain {
+
+ public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+ public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
+ public final static String TOPOLOGY_NAME = "topology.name";
+ public final static String LOCAL_MODE = "topology.localMode";
+
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningMain.class);
+
+ public static void main(String [] args) {
+ //System.setProperty("config.resource", "/application.conf");
+ //Config config = ConfigFactory.load();
+ Config config = null;
+ try {
+ LOG.info("Loading from configuration file");
+ config = new ConfigOptionParser().load(args);
+ } catch (Exception e) {
+ LOG.error("failed to load config");
+ }
+ IRichSpout spout = new HadoopQueueRunningSpout(config);
+ HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(config);
+ TopologyBuilder builder = new TopologyBuilder();
+
+ int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+ int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
+ int numOfSpoutTasks = 1;
+
+ String spoutName = "runningQueueSpout";
+ String boltName = "parserBolt";
+
+ builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
+ builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName);
+
+ StormTopology topology = builder.createTopology();
+
+ backtype.storm.Config stormConf = new backtype.storm.Config();
+ stormConf.setNumWorkers(numOfTotalWorkers);
+ stormConf.put(stormConf.TOPOLOGY_DEBUG, true);
+
+ String topoName = config.getString(TOPOLOGY_NAME);
+ Boolean local = config.getBoolean(LOCAL_MODE);
+ try {
+ if (!local) {
+ StormSubmitter.submitTopology(topoName, stormConf, topology);
+ } else {
+ //local mode
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology(topoName, stormConf, topology);
+ }
+ } catch (InvalidTopologyException e) {
+ e.printStackTrace();
+ } catch (AlreadyAliveException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
new file mode 100644
index 0000000..324489d
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eagle.hadoop.queue.common;
+
+public class HadoopClusterConstants {
+
+ public enum AggregateFunc{
+ MAX, AVG
+ }
+
+ public enum DataType {
+ METRIC, ENTITY
+ }
+
+ public enum DataSource {
+ CLUSTER_METRIC, RUNNING_APPS, SCHEDULER
+ }
+
+ public static class MetricName{
+
+ // Metrics from running apps
+ public final static String HADOOP_APPS_ALLOCATED_MB = "hadoop.%s.allocatedmb";
+ public final static String HADOOP_APPS_ALLOCATED_VCORES = "hadoop.%s.allocatedvcores";
+ public final static String HADOOP_APPS_RUNNING_CONTAINERS = "hadoop.%s.runningcontainers";
+
+ // metrics from cluster metrics
+ public final static String HADOOP_CLUSTER_NUMPENDING_JOBS = "hadoop.cluster.numpendingjobs";
+ public final static String HADOOP_CLUSTER_ALLOCATED_MEMORY = "hadoop.cluster.allocatedmemory";
+ public final static String HADOOP_CLUSTER_TOTAL_MEMORY = "hadoop.cluster.totalmemory";
+ public final static String HADOOP_CLUSTER_AVAILABLE_MEMORY = "hadoop.cluster.availablememory";
+ public final static String HADOOP_CLUSTER_RESERVED_MEMORY = "hadoop.cluster.reservedmemory";
+
+ // metrics from scheduler info
+ public final static String HADOOP_CLUSTER_CAPACITY = "hadoop.cluster.capacity";
+ public final static String HADOOP_CLUSTER_USED_CAPACITY = "hadoop.cluster.usedcapacity";
+
+ public final static String HADOOP_QUEUE_NUMPENDING_JOBS = "hadoop.queue.numpendingjobs";
+ public final static String HADOOP_QUEUE_USED_CAPACITY = "hadoop.queue.usedcapacity";
+ public final static String HADOOP_QUEUE_USED_CAPACITY_RATIO = "hadoop.queue.usedcapacityratio";
+
+ public final static String HADOOP_USER_NUMPENDING_JOBS = "hadoop.user.numpendingjobs";
+ public final static String HADOOP_USER_USED_MEMORY = "hadoop.user.usedmemory";
+ public final static String HADOOP_USER_USED_MEMORY_RATIO = "hadoop.user.usedmemoryratio";
+
+ }
+
+ public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
+
+ // tag constants
+ public static final String TAG_PARENT_QUEUE = "parentQueue";
+ public static final String TAG_QUEUE = "queue";
+ public static final String TAG_USER = "user";
+ public static final String TAG_SITE = "site";
+ public static final String TAG_CLUSTER = "cluster";
+
+ // field constants
+ public static final String FIELD_DATATYPE = "dataType";
+ public static final String FIELD_DATA = "data";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
new file mode 100644
index 0000000..f2c4b1f
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.common;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.InputStream;
+
+public class HadoopYarnResourceUtils {
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public static Object getObjectFromStreamWithGzip(String urlString, Class<?> clazz) throws Exception {
+ InputStream is = null;
+ Object o = null;
+ try {
+ is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
+ o = OBJ_MAPPER.readValue(is, clazz);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Fetch resource %s failed", urlString), e);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ return o;
+ }
+
+ public static String getConfigValue(Config eagleConf, String key, String defaultValue) {
+ if (eagleConf.hasPath(key)) {
+ return eagleConf.getString(key);
+ } else {
+ return defaultValue;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
new file mode 100644
index 0000000..7fd275b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.common;
+
+public class YarnClusterResourceURLBuilder {
+
+ private final static String CLUSTER_SCHEDULER_API_URL = "ws/v1/cluster/scheduler";
+ private final static String CLUSTER_METRICS_API_URL = "ws/v1/cluster/metrics";
+ private final static String CLUSTER_APPS_API_URL = "ws/v1/cluster/apps";
+ private final static String ANONYMOUS_PARAMETER = "anonymous=true";
+
+ public static String buildSchedulerInfoURL(String urlBase) {
+ return urlBase + CLUSTER_SCHEDULER_API_URL + "?" + ANONYMOUS_PARAMETER;
+ }
+
+ public static String buildClusterMetricsURL(String urlBase) {
+ return urlBase + CLUSTER_METRICS_API_URL + "?" + ANONYMOUS_PARAMETER;
+ }
+
+ public static String buildRunningAppsURL(String urlBase) {
+ return urlBase + CLUSTER_APPS_API_URL + "?state=RUNNING" + "&" + ANONYMOUS_PARAMETER;
+ }
+
+ public static String buildFinishedAppsURL(String urlBase) {
+ return urlBase + CLUSTER_APPS_API_URL + "?state=FINISHED" + "&" + ANONYMOUS_PARAMETER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
new file mode 100644
index 0000000..05e3be9
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.common;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.ha.AbstractURLSelector;
+
+public class YarnURLSelectorImpl extends AbstractURLSelector {
+
+ public YarnURLSelectorImpl(String[] urls, Constants.CompressionType compressionType) {
+ super(urls, compressionType);
+ }
+
+ @Override
+ protected String buildTestURL(String urlToCheck) {
+ return YarnClusterResourceURLBuilder.buildRunningAppsURL(urlToCheck);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
new file mode 100644
index 0000000..5cf44dc
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
+import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetrics;
+import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetricsWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ClusterMetricsCrawler implements Runnable {
+
+ private final static Logger logger = LoggerFactory.getLogger(ClusterMetricsCrawler.class);
+ private ClusterMetricsParseListener listener;
+ private String urlString;
+
+ public ClusterMetricsCrawler(String site, String urlBase, SpoutOutputCollector collector) {
+ listener = new ClusterMetricsParseListener(site, collector);
+ urlString = YarnClusterResourceURLBuilder.buildClusterMetricsURL(urlBase);
+ }
+
+ @Override
+ public void run() {
+ try {
+ logger.info("Start to crawl cluster metrics from " + this.urlString);
+ ClusterMetricsWrapper metricsWrapper = (ClusterMetricsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, ClusterMetricsWrapper.class);
+ ClusterMetrics metrics = metricsWrapper.getClusterMetrics();
+ if(metrics == null) {
+ logger.error("Failed to crawl cluster metrics");
+ } else {
+ long currentTimestamp = System.currentTimeMillis();
+ listener.onMetric(metrics, currentTimestamp);
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ if(logger.isDebugEnabled()) {
+ logger.trace(e.getMessage(), e);
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ } finally {
+ listener.flush();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
new file mode 100644
index 0000000..b8a5b45
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ *
+ */
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetrics;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.AggregateFunc;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class ClusterMetricsParseListener {
+
+ private String site;
+ private SpoutOutputCollector collector;
+
+ private long maxTimestamp;
+ private Map<MetricKey, GenericMetricEntity> clusterMetricEntities = new HashMap<>();
+ private Map<MetricKey, Integer> clusterMetricCounts = new HashMap<>();
+
+ private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+ private final static long HOLD_TIME_WINDOW = 2 * DateTimeUtil.ONEMINUTE;
+
+ public ClusterMetricsParseListener(String site, SpoutOutputCollector collector){
+ reset();
+ this.site = site;
+ this.collector = collector;
+ }
+
+ private void createMetric(String metricName, long timestamp, double value, HadoopClusterConstants.AggregateFunc aggFunc){
+ if (timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ }
+ timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
+ MetricKey key = new MetricKey(metricName, timestamp);
+ GenericMetricEntity entity = clusterMetricEntities.get(key);
+ if (entity == null) {
+ entity = new GenericMetricEntity();
+ entity.setTags(buildMetricTags());
+ entity.setTimestamp(timestamp);
+ entity.setPrefix(metricName);
+ entity.setValue(new double[]{0.0});
+ clusterMetricEntities.put(key, entity);
+ }
+ if (clusterMetricCounts.get(key) == null){
+ clusterMetricCounts.put(key, 0);
+ }
+ updateEntityAggValue(entity, aggFunc, value, clusterMetricCounts.get(key));
+ clusterMetricCounts.put(key, clusterMetricCounts.get(key) + 1);
+ }
+
+ public void onMetric(ClusterMetrics metrics, long currentTimestamp) {
+ createMetric(MetricName.HADOOP_CLUSTER_NUMPENDING_JOBS, currentTimestamp, metrics.getAppsPending(), AggregateFunc.MAX);
+ createMetric(MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY, currentTimestamp, metrics.getAllocatedMB(), AggregateFunc.AVG);
+ createMetric(MetricName.HADOOP_CLUSTER_TOTAL_MEMORY, currentTimestamp, metrics.getTotalMB(), AggregateFunc.MAX);
+ createMetric(MetricName.HADOOP_CLUSTER_AVAILABLE_MEMORY, currentTimestamp, metrics.getAvailableMB(), AggregateFunc.AVG);
+ createMetric(MetricName.HADOOP_CLUSTER_RESERVED_MEMORY, currentTimestamp, metrics.getReservedMB(), AggregateFunc.AVG);
+ }
+
+ public void flush() {
+ HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis());
+ List<GenericMetricEntity> metrics = new ArrayList<>(clusterMetricEntities.values());
+ this.collector.emit(new ValuesArray(DataType.METRIC.name(), metrics), messageId);
+ reset();
+ }
+
+ private void reset() {
+ maxTimestamp = 0;
+ clearOldCache();
+ }
+
+ private void clearOldCache() {
+ List<MetricKey> removedkeys = clusterMetricEntities.keySet().stream().filter(key -> key.createTime < maxTimestamp - HOLD_TIME_WINDOW).collect(Collectors.toList());
+
+ for (MetricKey key : removedkeys) {
+ clusterMetricEntities.remove(key);
+ }
+ }
+
+ private Map<String, String> buildMetricTags(){
+ Map<String,String> tags = new HashMap<String, String>();
+ tags.put(HadoopClusterConstants.TAG_SITE, site);
+ return tags;
+ }
+
+ private void updateEntityAggValue(GenericMetricEntity entity,
+ HadoopClusterConstants.AggregateFunc aggFunc,
+ double value,
+ double count) {
+ double lastValue = entity.getValue()[0];
+ switch (aggFunc){
+ case MAX:
+ entity.setValue(new double[]{Math.max(lastValue, value)});
+ return;
+ case AVG:
+ long avgValue = (long) ((lastValue * count + value) / (count +1));
+ entity.setValue(new double[]{avgValue});
+ return;
+ }
+ }
+
+ private class MetricKey {
+ String metricName;
+ Long createTime;
+
+ public MetricKey(String metricName, Long timestamp) {
+ this.metricName = metricName;
+ this.createTime = timestamp;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof MetricKey) {
+ MetricKey key = (MetricKey) obj;
+ if (key == null) {
+ return false;
+ }
+ return Objects.equals(metricName, key.metricName) & Objects.equals(createTime, key.createTime);
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return new HashCodeBuilder().append(metricName).append(createTime).toHashCode();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
new file mode 100755
index 0000000..be06e12
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ *
+ */
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.hadoop.queue.model.applications.App;
+import org.apache.eagle.hadoop.queue.model.applications.Apps;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.*;
+
+public class RunningAppParseListener {
+
+ private final static Logger logger = LoggerFactory.getLogger(RunningAppParseListener.class);
+ private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+
+ @SuppressWarnings("serial")
+ public static HashMap<String, String> metrics = new HashMap<String, String>() {
+ {
+ put(MetricName.HADOOP_APPS_ALLOCATED_MB, "getAllocatedMB");
+ put(MetricName.HADOOP_APPS_ALLOCATED_VCORES, "getAllocatedVCores");
+ put(MetricName.HADOOP_APPS_RUNNING_CONTAINERS, "getRunningContainers");
+ }
+ };
+
+ private String site;
+ private SpoutOutputCollector collector;
+ private Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>();
+
+ public RunningAppParseListener(String site, SpoutOutputCollector collector){
+ this.site = site;
+ this.collector = collector;
+ }
+
+ public void flush() {
+ logger.info("start sending app metrics, size: " + appMetricEntities.size());
+ HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.RUNNING_APPS, System.currentTimeMillis());
+ List<GenericMetricEntity> metrics = new ArrayList<>(appMetricEntities.values());
+ collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+ appMetricEntities.clear();
+ }
+
+ private Map<String, String> buildMetricTags(AggLevel level, Map<String, String> tags){
+ Map<String, String> newTags = new HashMap<String, String>();
+ newTags.put(HadoopClusterConstants.TAG_SITE, site);
+ tags.entrySet().stream().filter(entry -> level.level.equalsIgnoreCase(entry.getKey())).forEach(entry -> {
+ newTags.put(entry.getKey(), entry.getValue());
+ });
+ return newTags;
+ }
+
+ private void createMetric(String metricName, Map<String, String> tags, long timestamp, int value) {
+ String key = metricName + tags.toString() + " " + timestamp;
+ GenericMetricEntity entity = appMetricEntities.get(key);
+ if (entity == null) {
+ entity = new GenericMetricEntity();
+ entity.setTags(tags);
+ entity.setTimestamp(timestamp);
+ entity.setPrefix(metricName);
+ entity.setValue(new double[]{0.0});
+ appMetricEntities.put(key, entity);
+ }
+ double lastValue = entity.getValue()[0];
+ entity.setValue(new double[]{lastValue + value});
+ }
+
+ public void onMetric(Apps apps, long timestamp) throws Exception {
+ timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
+ for (App app : apps.getApp()) {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(HadoopClusterConstants.TAG_USER, app.getUser());
+ tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue());
+ for (AggLevel level : AggLevel.values()) {
+ Map<String, String> newTags = buildMetricTags(level, tags);
+ for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
+ Method method = App.class.getMethod(entry.getValue());
+ Integer value = (Integer) method.invoke(app);
+ String metricName = String.format(entry.getKey(), level.name);
+ createMetric(metricName, newTags, timestamp, value);
+ }
+ }
+ }
+ }
+
+ private enum AggLevel {
+ CLUSTER(HadoopClusterConstants.TAG_CLUSTER, ""),
+ QUEUE(HadoopClusterConstants.TAG_QUEUE, HadoopClusterConstants.TAG_QUEUE),
+ USER(HadoopClusterConstants.TAG_USER, HadoopClusterConstants.TAG_USER);
+
+ private String name;
+ private String level;
+ AggLevel(String name, String level) {
+ this.name = name;
+ this.level = level;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
new file mode 100755
index 0000000..55ba41b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
+import org.apache.eagle.hadoop.queue.model.applications.Apps;
+import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import org.apache.eagle.hadoop.queue.model.applications.AppsWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RunningAppsCrawler implements Runnable {
+
+ private final static Logger logger = LoggerFactory.getLogger(RunningAppsCrawler.class);
+
+ private RunningAppParseListener listener;
+ private String urlString;
+
+ public RunningAppsCrawler(String site, String baseUrl, SpoutOutputCollector collector){
+ this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl);
+ //this.urlString = YarnClusterResourceURLBuilder.buildFinishedAppsURL(baseUrl);
+ listener = new RunningAppParseListener(site, collector);
+ }
+
+ @Override
+ public void run() {
+ try {
+ logger.info("Start to crawl app metrics from " + this.urlString);
+ AppsWrapper appsWrapper = (AppsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, AppsWrapper.class);
+ if(appsWrapper == null || appsWrapper.getApps() == null) {
+ logger.error("Failed to crawl running applications with api = " + urlString);
+ } else {
+ long currentTimestamp = System.currentTimeMillis();
+ listener.onMetric(appsWrapper.getApps(), currentTimestamp);
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ if(logger.isDebugEnabled()) {
+ logger.trace(e.getMessage(), e);
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ } finally {
+ listener.flush();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
new file mode 100755
index 0000000..f25fa20
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
+import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SchedulerInfoCrawler implements Runnable {
+
+ private final static Logger logger = LoggerFactory.getLogger(SchedulerInfoCrawler.class);
+
+ private SchedulerInfoParseListener listener;
+ private String urlString;
+
+ public SchedulerInfoCrawler(String site, String baseUrl, SpoutOutputCollector collector) {
+ this.urlString = YarnClusterResourceURLBuilder.buildSchedulerInfoURL(baseUrl);
+ this.listener = new SchedulerInfoParseListener(site, collector);
+ }
+
+ @Override
+ public void run() {
+ try {
+ //https://apollo-phx-rm-2.vip.ebay.com:50030/ws/v1/cluster/scheduler?anonymous=true
+ logger.info("Start to crawl cluster scheduler queues from " + this.urlString);
+ SchedulerWrapper schedulerWrapper = (SchedulerWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, SchedulerWrapper.class);
+ if (schedulerWrapper == null || schedulerWrapper.getScheduler() == null) {
+ logger.error("Failed to crawl scheduler info with url = " + this.urlString);
+ } else {
+ SchedulerInfo scheduler = schedulerWrapper.getScheduler().getSchedulerInfo();
+ logger.info("Crawled " + scheduler.getQueues().getQueue().size() + " queues");
+ long currentTimestamp = System.currentTimeMillis();
+ listener.onMetric(scheduler, currentTimestamp);
+ }
+ } catch (IOException e) {
+ logger.error("Got IO exception while connecting to "+this.urlString + " : "+ e.getMessage());
+ } catch (Exception e) {
+ logger.error("Got exception while crawling queues:" + e.getMessage(), e);
+ } catch (Throwable e) {
+ logger.error("Got throwable exception while crawling queues:" + e.getMessage(), e);
+ } finally {
+ listener.flush();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
new file mode 100644
index 0000000..99d83d4
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class SchedulerInfoParseListener {
+
+ private final static Logger LOG = LoggerFactory.getLogger(SchedulerInfoParseListener.class);
+ //private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+ //private int MAX_CACHE_COUNT = 1000;
+
+ private final List<RunningQueueAPIEntity> runningQueueAPIEntities = new ArrayList<>();
+ private final List<GenericMetricEntity> metricEntities = new ArrayList<>();
+
+ private String site;
+ private SpoutOutputCollector collector;
+
+ public SchedulerInfoParseListener(String site, SpoutOutputCollector collector) {
+ this.site = site;
+ this.collector = collector;
+ }
+
+ public void onMetric(SchedulerInfo scheduler, long currentTimestamp) throws Exception {
+ Map<String,String> tags = buildMetricTags(null, null);
+ createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity());
+ createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity());
+ for(Queue queue : scheduler.getQueues().getQueue()) {
+ createQueues(queue, currentTimestamp, scheduler, null);
+ }
+ }
+
+ public void flush() {
+ LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size());
+ HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
+ List<GenericMetricEntity> metrics = new ArrayList<>(metricEntities);
+ collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+
+ LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size());
+ messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
+ List<RunningQueueAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
+ collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId);
+
+ runningQueueAPIEntities.clear();
+ metricEntities.clear();
+ }
+
+ private Map<String, String> buildMetricTags(String queueName, String parentQueueName) {
+ Map<String,String> tags = new HashMap<>();
+ tags.put(HadoopClusterConstants.TAG_SITE, this.site);
+ if (queueName != null) {
+ tags.put(HadoopClusterConstants.TAG_QUEUE, queueName);
+ }
+ if (parentQueueName != null) {
+ tags.put(HadoopClusterConstants.TAG_PARENT_QUEUE, parentQueueName);
+ }
+ return tags;
+ }
+
+ private void createMetric(String metricName, Map<String,String> tags,long timestamp, double value) throws Exception {
+ GenericMetricEntity e = new GenericMetricEntity();
+ e.setPrefix(metricName);
+ e.setTimestamp(timestamp);
+ e.setTags(tags);
+ e.setValue(new double[]{value});
+ this.metricEntities.add(e);
+ }
+
+ private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception {
+ RunningQueueAPIEntity _entity = new RunningQueueAPIEntity();
+ Map<String, String> _tags = buildMetricTags(queue.getQueueName(), parentQueueName);
+ _entity.setTags(_tags);
+ _entity.setState(queue.getState());
+ _entity.setScheduler(scheduler.getType());
+ _entity.setAbsoluteCapacity(queue.getAbsoluteCapacity());
+ _entity.setAbsoluteMaxCapacity(queue.getAbsoluteMaxCapacity());
+ _entity.setAbsoluteUsedCapacity(queue.getAbsoluteUsedCapacity());
+ _entity.setMemory(queue.getResourcesUsed().getMemory());
+ _entity.setVcores(queue.getResourcesUsed().getvCores());
+ _entity.setNumActiveApplications(queue.getNumApplications());
+ _entity.setNumPendingApplications(queue.getNumPendingApplications());
+ _entity.setMaxActiveApplications(queue.getMaxActiveApplications());
+ _entity.setTimestamp(currentTimestamp);
+
+ List<UserWrapper> userList = new ArrayList<>();
+ if (queue.getUsers() != null && queue.getUsers().getUser() != null ) {
+ for (User user : queue.getUsers().getUser()) {
+ UserWrapper newUser = new UserWrapper(user);
+ userList.add(newUser);
+ }
+ }
+ _entity.setUsers(userList);
+
+ runningQueueAPIEntities.add(_entity);
+
+ createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, currentTimestamp, queue.getNumPendingApplications());
+ createMetric(MetricName.HADOOP_QUEUE_USED_CAPACITY,_tags,currentTimestamp,queue.getAbsoluteUsedCapacity());
+ if(queue.getAbsoluteCapacity() == 0) {
+ createMetric(MetricName.HADOOP_QUEUE_USED_CAPACITY_RATIO,_tags,currentTimestamp,0);
+ } else {
+ createMetric(MetricName.HADOOP_QUEUE_USED_CAPACITY_RATIO,_tags,currentTimestamp,queue.getAbsoluteUsedCapacity()/queue.getAbsoluteCapacity());
+ }
+
+ if (queue.getUsers() != null && queue.getUsers().getUser() != null ) {
+ for (User user : queue.getUsers().getUser()) {
+ Map<String, String> userTags = new HashMap<>(_tags);
+ userTags.put(HadoopClusterConstants.TAG_USER, user.getUsername());
+ createMetric(HadoopClusterConstants.MetricName.HADOOP_USER_NUMPENDING_JOBS,userTags,currentTimestamp,user.getNumPendingApplications());
+ createMetric(HadoopClusterConstants.MetricName.HADOOP_USER_USED_MEMORY,userTags,currentTimestamp,user.getResourcesUsed().getMemory());
+ createMetric(HadoopClusterConstants.MetricName.HADOOP_USER_USED_MEMORY_RATIO,userTags,currentTimestamp,((double)user.getResourcesUsed().getMemory()) / queue.getResourcesUsed().getMemory());
+ }
+ }
+
+ if (queue.getQueues() != null && queue.getQueues().getQueue() != null) {
+ for (Queue subQueue : queue.getQueues().getQueue()) {
+ createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java
new file mode 100644
index 0000000..0bce87d
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.exceptions;
+
+public class HadoopQueueFetcherException extends Exception {
+
+ private static final long serialVersionUID = -2425311876734366496L;
+
+ /**
+ * Default constructor of FeederException
+ */
+ public HadoopQueueFetcherException() {
+ super();
+ }
+
+ /**
+ * Constructor of FeederException
+ *
+ * @param message error message
+ */
+ public HadoopQueueFetcherException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructor of FeederException
+ *
+ * @param message error message
+ * @param cause the cause of the exception
+ *
+ */
+ public HadoopQueueFetcherException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructor of FeederException
+ *
+ * @param cause the cause of the exception
+ */
+ public HadoopQueueFetcherException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
new file mode 100644
index 0000000..728c81e
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eagle.hadoop.queue.model;
+
+import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class HadoopQueueEntityRepository extends EntityRepository {
+ public HadoopQueueEntityRepository() {
+ this.registerEntity(RunningQueueAPIEntity.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
new file mode 100755
index 0000000..e0a2b61
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ *
+ */
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/*
+ * App model for Yarn Resource http://<rm http address:port>/ws/v1/cluster/apps
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class App {
+ private String id;
+ private String user;
+ private String name;
+ private String queue;
+ private String state;
+ private String finalStatus;
+ private double progress;
+ private String trackingUI;
+ private String trackingUrl;
+ private String diagnostics;
+ private long clusterId;
+ private String applicationType;
+ private String applicationTags;
+ private long startedTime;
+ private long finishedTime;
+ private long elapsedTime;
+ private String amContainerLogs;
+ private String amHostHttpAddress;
+ private int allocatedMB;
+ private int allocatedVCores;
+ private int runningContainers;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public String getFinalStatus() {
+ return finalStatus;
+ }
+
+ public void setFinalStatus(String finalStatus) {
+ this.finalStatus = finalStatus;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+
+ public String getTrackingUI() {
+ return trackingUI;
+ }
+
+ public void setTrackingUI(String trackingUI) {
+ this.trackingUI = trackingUI;
+ }
+
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(long clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public long getStartedTime() {
+ return startedTime;
+ }
+
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+
+ public long getFinishedTime() {
+ return finishedTime;
+ }
+
+ public void setFinishedTime(long finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+
+ public String getApplicationType() {
+ return applicationType;
+ }
+
+ public void setApplicationType(String applicationType) {
+ this.applicationType = applicationType;
+ }
+
+ public String getApplicationTags() {
+ return applicationTags;
+ }
+
+ public void setApplicationTags(String applicationTags) {
+ this.applicationTags = applicationTags;
+ }
+
+ public String getAmContainerLogs() {
+ return amContainerLogs;
+ }
+
+ public void setAmContainerLogs(String amContainerLogs) {
+ this.amContainerLogs = amContainerLogs;
+ }
+
+ public String getAmHostHttpAddress() {
+ return amHostHttpAddress;
+ }
+
+ public void setAmHostHttpAddress(String amHostHttpAddress) {
+ this.amHostHttpAddress = amHostHttpAddress;
+ }
+
+ public int getAllocatedMB() {
+ return allocatedMB;
+ }
+
+ public void setAllocatedMB(int allocatedMB) {
+ this.allocatedMB = allocatedMB;
+ }
+
+ public int getAllocatedVCores() {
+ return allocatedVCores;
+ }
+
+ public void setAllocatedVCores(int allocatedVCores) {
+ this.allocatedVCores = allocatedVCores;
+ }
+
+ public int getRunningContainers() {
+ return runningContainers;
+ }
+
+ public void setRunningContainers(int runningContainers) {
+ this.runningContainers = runningContainers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
new file mode 100755
index 0000000..649f17b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ *
+ */
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Apps {
+ private List<App> app;
+
+ public List<App> getApp() {
+ return app;
+ }
+
+ public void setApp(List<App> app) {
+ this.app = app;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
new file mode 100755
index 0000000..c204638
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+ private Apps apps;
+
+ public Apps getApps() {
+ return apps;
+ }
+
+ public void setApps(Apps apps) {
+ this.apps = apps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java
new file mode 100644
index 0000000..b254ebf
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.clusterMetrics;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterMetrics {
+ private int appsSubmitted;
+ private int appsCompleted;
+ private int appsPending;
+ private int appsRunning;
+ private int appsFailed;
+ private int appsKilled;
+ private long reservedMB;
+ private long availableMB;
+ private long allocatedMB;
+ private int containersAllocated;
+ private int containersReserved;
+ private int containersPending;
+ private long totalMB;
+ private int totalNodes;
+ private int lostNodes;
+ private int unhealthyNodes;
+ private int decommissionedNodes;
+ private int rebootedNodes;
+ private int activeNodes;
+
+ public int getAppsSubmitted() {
+ return appsSubmitted;
+ }
+
+ public void setAppsSubmitted(int appsSubmitted) {
+ this.appsSubmitted = appsSubmitted;
+ }
+
+ public int getAppsCompleted() {
+ return appsCompleted;
+ }
+
+ public void setAppsCompleted(int appsCompleted) {
+ this.appsCompleted = appsCompleted;
+ }
+
+ public int getAppsPending() {
+ return appsPending;
+ }
+
+ public void setAppsPending(int appsPending) {
+ this.appsPending = appsPending;
+ }
+
+ public int getAppsRunning() {
+ return appsRunning;
+ }
+
+ public void setAppsRunning(int appsRunning) {
+ this.appsRunning = appsRunning;
+ }
+
+ public int getAppsFailed() {
+ return appsFailed;
+ }
+
+ public void setAppsFailed(int appsFailed) {
+ this.appsFailed = appsFailed;
+ }
+
+ public int getAppsKilled() {
+ return appsKilled;
+ }
+
+ public void setAppsKilled(int appsKilled) {
+ this.appsKilled = appsKilled;
+ }
+
+ public long getReservedMB() {
+ return reservedMB;
+ }
+
+ public void setReservedMB(long reservedMB) {
+ this.reservedMB = reservedMB;
+ }
+
+ public long getAvailableMB() {
+ return availableMB;
+ }
+
+ public void setAvailableMB(long availableMB) {
+ this.availableMB = availableMB;
+ }
+
+ public long getAllocatedMB() {
+ return allocatedMB;
+ }
+
+ public void setAllocatedMB(long allocatedMB) {
+ this.allocatedMB = allocatedMB;
+ }
+
+ public int getContainersAllocated() {
+ return containersAllocated;
+ }
+
+ public void setContainersAllocated(int containersAllocated) {
+ this.containersAllocated = containersAllocated;
+ }
+
+ public int getContainersReserved() {
+ return containersReserved;
+ }
+
+ public void setContainersReserved(int containersReserved) {
+ this.containersReserved = containersReserved;
+ }
+
+ public int getContainersPending() {
+ return containersPending;
+ }
+
+ public void setContainersPending(int containersPending) {
+ this.containersPending = containersPending;
+ }
+
+ public long getTotalMB() {
+ return totalMB;
+ }
+
+ public void setTotalMB(long totalMB) {
+ this.totalMB = totalMB;
+ }
+
+ public int getTotalNodes() {
+ return totalNodes;
+ }
+
+ public void setTotalNodes(int totalNodes) {
+ this.totalNodes = totalNodes;
+ }
+
+ public int getLostNodes() {
+ return lostNodes;
+ }
+
+ public void setLostNodes(int lostNodes) {
+ this.lostNodes = lostNodes;
+ }
+
+ public int getUnhealthyNodes() {
+ return unhealthyNodes;
+ }
+
+ public void setUnhealthyNodes(int unhealthyNodes) {
+ this.unhealthyNodes = unhealthyNodes;
+ }
+
+ public int getDecommissionedNodes() {
+ return decommissionedNodes;
+ }
+
+ public void setDecommissionedNodes(int decommissionedNodes) {
+ this.decommissionedNodes = decommissionedNodes;
+ }
+
+ public int getRebootedNodes() {
+ return rebootedNodes;
+ }
+
+ public void setRebootedNodes(int rebootedNodes) {
+ this.rebootedNodes = rebootedNodes;
+ }
+
+ public int getActiveNodes() {
+ return activeNodes;
+ }
+
+ public void setActiveNodes(int activeNodes) {
+ this.activeNodes = activeNodes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java
new file mode 100644
index 0000000..a9d1a9a
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.clusterMetrics;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterMetricsWrapper {
+ public ClusterMetrics getClusterMetrics() {
+ return clusterMetrics;
+ }
+
+ public void setClusterMetrics(ClusterMetrics clusterMetrics) {
+ this.clusterMetrics = clusterMetrics;
+ }
+
+ private ClusterMetrics clusterMetrics;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java
new file mode 100644
index 0000000..ace5879
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Queue {
+ private String type;
+ private double capacity;
+ private double usedCapacity;
+ private double maxCapacity;
+ private double absoluteCapacity;
+ private double absoluteMaxCapacity;
+ private double absoluteUsedCapacity;
+
+ private ResourcesUsed resourcesUsed;
+ private String usedResources;
+ private String queueName;
+ private String state;
+ private Users users;
+
+ private int numApplications;
+ private int numPendingApplications;
+ private int numContainers;
+ private int maxApplications;
+ private int maxApplicationsPerUser;
+ private int maxActiveApplications;
+ private int maxActiveApplicationsPerUser;
+ private int userLimit;
+ private int userLimitFactor;
+ private Queues queues;
+
+ public String getUsedResources() {
+ return usedResources;
+ }
+
+ public void setUsedResources(String usedResources) {
+ this.usedResources = usedResources;
+ }
+
+ public int getMaxActiveApplicationsPerUser() {
+ return maxActiveApplicationsPerUser;
+ }
+
+ public void setMaxActiveApplicationsPerUser(int maxActiveApplicationsPerUser) {
+ this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
+ }
+
+ public int getNumPendingApplications() {
+ return numPendingApplications;
+ }
+
+ public void setNumPendingApplications(int numPendingApplications) {
+ this.numPendingApplications = numPendingApplications;
+ }
+
+ public int getNumContainers() {
+ return numContainers;
+ }
+
+ public void setNumContainers(int numContainers) {
+ this.numContainers = numContainers;
+ }
+
+ public int getMaxApplications() {
+ return maxApplications;
+ }
+
+ public void setMaxApplications(int maxApplications) {
+ this.maxApplications = maxApplications;
+ }
+
+ public int getMaxApplicationsPerUser() {
+ return maxApplicationsPerUser;
+ }
+
+ public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
+ this.maxApplicationsPerUser = maxApplicationsPerUser;
+ }
+
+ public int getMaxActiveApplications() {
+ return maxActiveApplications;
+ }
+
+ public void setMaxActiveApplications(int maxActiveApplications) {
+ this.maxActiveApplications = maxActiveApplications;
+ }
+
+ public int getUserLimit() {
+ return userLimit;
+ }
+
+ public void setUserLimit(int userLimit) {
+ this.userLimit = userLimit;
+ }
+
+ public int getUserLimitFactor() {
+ return userLimitFactor;
+ }
+
+ public void setUserLimitFactor(int userLimitFactor) {
+ this.userLimitFactor = userLimitFactor;
+ }
+
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+ public ResourcesUsed getResourcesUsed() {
+ return resourcesUsed;
+ }
+
+ public void setResourcesUsed(ResourcesUsed resourcesUsed) {
+ this.resourcesUsed = resourcesUsed;
+ }
+
+
+ public Users getUsers() { return users; }
+
+ public void setUsers(Users users) { this.users = users; }
+
+ public double getAbsoluteUsedCapacity() {
+ return absoluteUsedCapacity;
+ }
+
+ public void setAbsoluteUsedCapacity(double absoluteUsedCapacity) {
+ this.absoluteUsedCapacity = absoluteUsedCapacity;
+ }
+
+ public double getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(double capacity) {
+ this.capacity = capacity;
+ }
+
+ public double getUsedCapacity() {
+ return usedCapacity;
+ }
+
+ public void setUsedCapacity(double usedCapacity) {
+ this.usedCapacity = usedCapacity;
+ }
+
+ public double getMaxCapacity() {
+ return maxCapacity;
+ }
+
+ public void setMaxCapacity(double maxCapacity) {
+ this.maxCapacity = maxCapacity;
+ }
+
+ public double getAbsoluteCapacity() {
+ return absoluteCapacity;
+ }
+
+ public void setAbsoluteCapacity(double absoluteCapacity) {
+ this.absoluteCapacity = absoluteCapacity;
+ }
+
+ public double getAbsoluteMaxCapacity() {
+ return absoluteMaxCapacity;
+ }
+
+ public void setAbsoluteMaxCapacity(double absoluteMaxCapacity) {
+ this.absoluteMaxCapacity = absoluteMaxCapacity;
+ }
+
+ public int getNumApplications() {
+ return numApplications;
+ }
+
+ public void setNumApplications(int numApplications) {
+ this.numApplications = numApplications;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public Queues getQueues() {
+ return queues;
+ }
+
+ public void setQueues(Queues queues) {
+ this.queues = queues;
+ }
+}