You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/07/12 03:33:26 UTC

[2/2] incubator-eagle git commit: [EAGLE-350] Running queue metrics monitoring

[EAGLE-350] Running queue metrics monitoring

https://issues.apache.org/jira/browse/EAGLE-350

Author: Zhao, Qingwen <qi...@ebay.com>

Closes #260 from qingwen220/EAGLE-350.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9f6fea4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9f6fea4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9f6fea4a

Branch: refs/heads/develop
Commit: 9f6fea4ab65f4e7386d01b936b3a5d913ebee7e6
Parents: f3e7687
Author: Zhao, Qingwen <qi...@ebay.com>
Authored: Tue Jul 12 11:32:57 2016 +0800
Committer: Zhao, Qingwen <qi...@ebay.com>
Committed: Tue Jul 12 11:32:57 2016 +0800

----------------------------------------------------------------------
 eagle-assembly/src/main/bin/eagle-env.sh        |   2 +-
 .../src/test/resources/log4j.properties         |   2 +-
 eagle-jpm/eagle-hadoop-queue/pom.xml            |  90 ++++++++
 .../assembly/eagle-running-queue-assembly.xml   |  66 ++++++
 .../hadoop/queue/HadoopQueueRunningMain.java    |  93 ++++++++
 .../queue/common/HadoopClusterConstants.java    |  75 +++++++
 .../queue/common/HadoopYarnResourceUtils.java   |  60 +++++
 .../common/YarnClusterResourceURLBuilder.java   |  43 ++++
 .../queue/common/YarnURLSelectorImpl.java       |  34 +++
 .../queue/crawler/ClusterMetricsCrawler.java    |  65 ++++++
 .../crawler/ClusterMetricsParseListener.java    | 155 +++++++++++++
 .../queue/crawler/RunningAppParseListener.java  | 125 +++++++++++
 .../queue/crawler/RunningAppsCrawler.java       |  66 ++++++
 .../queue/crawler/SchedulerInfoCrawler.java     |  66 ++++++
 .../crawler/SchedulerInfoParseListener.java     | 148 ++++++++++++
 .../exceptions/HadoopQueueFetcherException.java |  60 +++++
 .../model/HadoopQueueEntityRepository.java      |  27 +++
 .../hadoop/queue/model/applications/App.java    | 222 ++++++++++++++++++
 .../hadoop/queue/model/applications/Apps.java   |  41 ++++
 .../queue/model/applications/AppsWrapper.java   |  36 +++
 .../model/clusterMetrics/ClusterMetrics.java    | 198 ++++++++++++++++
 .../clusterMetrics/ClusterMetricsWrapper.java   |  37 +++
 .../hadoop/queue/model/scheduler/Queue.java     | 224 +++++++++++++++++++
 .../hadoop/queue/model/scheduler/Queues.java    |  38 ++++
 .../queue/model/scheduler/ResourcesUsed.java    |  48 ++++
 .../model/scheduler/RunningQueueAPIEntity.java  | 158 +++++++++++++
 .../hadoop/queue/model/scheduler/Scheduler.java |  36 +++
 .../queue/model/scheduler/SchedulerInfo.java    |  83 +++++++
 .../queue/model/scheduler/SchedulerWrapper.java |  36 +++
 .../hadoop/queue/model/scheduler/User.java      |  63 ++++++
 .../queue/model/scheduler/UserWrapper.java      |  76 +++++++
 .../hadoop/queue/model/scheduler/Users.java     |  38 ++++
 .../queue/storm/HadoopQueueMessageId.java       |  57 +++++
 .../storm/HadoopQueueMetricPersistBolt.java     | 108 +++++++++
 .../storm/HadoopQueueRunningExtractor.java      |  95 ++++++++
 .../queue/storm/HadoopQueueRunningSpout.java    |  73 ++++++
 .../src/main/resources/application.conf         |  36 +++
 .../src/main/resources/log4j.properties         |  34 +++
 .../hadoop/queue/TestClusterMetricsCrawler.java |  39 ++++
 .../queue/TestHadoopYarnResourceUtils.java      |  54 +++++
 .../hadoop/queue/TestRunningAppsCrawler.java    |  39 ++++
 .../hadoop/queue/TestSchedulerInfoCrawler.java  |  39 ++++
 .../src/test/resources/application-bak.conf     |  36 +++
 .../src/test/resources/application.conf         |  36 +++
 .../src/test/resources/log4j.properties         |  34 +++
 .../resourceFetch/ha/AbstractURLSelector.java   | 100 +++++++++
 eagle-jpm/pom.xml                               |   3 +-
 eagle-topology-assembly/pom.xml                 |   5 +
 pom.xml                                         |   1 +
 49 files changed, 3297 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-assembly/src/main/bin/eagle-env.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-env.sh b/eagle-assembly/src/main/bin/eagle-env.sh
index 66d72ae..c6de7e0 100755
--- a/eagle-assembly/src/main/bin/eagle-env.sh
+++ b/eagle-assembly/src/main/bin/eagle-env.sh
@@ -52,4 +52,4 @@ done
 
 # EAGLE_TABLE_LIST
 # TODO: Automatically create hbase table when initializing
-export EAGLE_TABLE_LIST='alertdef ipzone streamMetadata alertdetail fileSensitivity eaglehdfs_alert streamdef eagle_metric alertExecutor alertStream alertStreamSchema hiveResourceSensitivity hbaseResourceSensitivity mlmodel userprofile hfdsusercommandpattern appCommand appDefinition serviceAudit aggregatedef alertNotifications eagleSiteDesc eagleSiteApplication eagleApplicationDesc eagleFeatureDesc eagle_metadata'
\ No newline at end of file
+export EAGLE_TABLE_LIST='alertdef ipzone streamMetadata alertdetail fileSensitivity eaglehdfs_alert streamdef eagle_metric alertExecutor alertStream alertStreamSchema hiveResourceSensitivity hbaseResourceSensitivity mlmodel userprofile hfdsusercommandpattern appCommand appDefinition serviceAudit aggregatedef alertNotifications eagleSiteDesc eagleSiteApplication eagleApplicationDesc eagleFeatureDesc eagle_metadata running_queue'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties b/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
index d59ded6..fb13ad5 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/test/resources/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=DEBUG, stdout
 
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/pom.xml b/eagle-jpm/eagle-hadoop-queue/pom.xml
new file mode 100644
index 0000000..6566bf5
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-hadoop-queue</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-util</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-process-api</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.wso2.orbit.com.lmax</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/assembly/eagle-running-queue-assembly.xml</descriptor>
+                    <finalName>eagle-running-queue-${project.version}</finalName>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml b/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml
new file mode 100644
index 0000000..244c5f0
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/assembly/eagle-running-queue-assembly.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>assembly</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>false</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/application.conf</exclude>
+                    <exclude>**/defaults.yaml</exclude>
+                    <exclude>**/*storm.yaml</exclude>
+                    <exclude>**/*storm.yaml.1</exclude>
+                    <exclude>**/log4j.properties</exclude>
+                </excludes>
+            </unpackOptions>
+            <excludes>
+                <exclude>org.apache.storm:storm-core</exclude>
+                <exclude>org.slf4j:slf4j-api</exclude>
+                <exclude>org.slf4j:log4j-over-slf4j</exclude>
+                <exclude>org.slf4j:slf4j-log4j12</exclude>
+                <exclude>log4j:log4j</exclude>
+                <exclude>asm:asm</exclude>
+                <exclude>org.apache.log4j.wso2:log4j</exclude>
+                <exclude>log4j:apache-log4j-extras</exclude>
+                <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.outputDirectory}/</directory>
+            <outputDirectory>/</outputDirectory>
+            <excludes>
+                <exclude>**/application.conf</exclude>
+                <exclude>**/log4j.properties</exclude>
+                <exclude>**/*.yaml</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
new file mode 100644
index 0000000..a285a6b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HadoopQueueRunningMain {
+
+    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+    public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
+    public final static String TOPOLOGY_NAME = "topology.name";
+    public final static String LOCAL_MODE = "topology.localMode";
+
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningMain.class);
+
+    public static void main(String [] args) {
+        //System.setProperty("config.resource", "/application.conf");
+        //Config config = ConfigFactory.load();
+        Config config = null;
+        try {
+            LOG.info("Loading from configuration file");
+            config = new ConfigOptionParser().load(args);
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+        }
+        IRichSpout spout = new HadoopQueueRunningSpout(config);
+        HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(config);
+        TopologyBuilder builder = new TopologyBuilder();
+
+        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+        int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
+        int numOfSpoutTasks = 1;
+
+        String spoutName = "runningQueueSpout";
+        String boltName = "parserBolt";
+
+        builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
+        builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName);
+
+        StormTopology topology = builder.createTopology();
+
+        backtype.storm.Config stormConf = new backtype.storm.Config();
+        stormConf.setNumWorkers(numOfTotalWorkers);
+        stormConf.put(stormConf.TOPOLOGY_DEBUG, true);
+
+        String topoName = config.getString(TOPOLOGY_NAME);
+        Boolean local = config.getBoolean(LOCAL_MODE);
+        try {
+            if (!local) {
+                StormSubmitter.submitTopology(topoName, stormConf, topology);
+            } else {
+                //local mode
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology(topoName, stormConf, topology);
+            }
+        } catch (InvalidTopologyException e) {
+            e.printStackTrace();
+        } catch (AlreadyAliveException e) {
+            e.printStackTrace();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
new file mode 100644
index 0000000..324489d
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.hadoop.queue.common;
+
+public class HadoopClusterConstants {
+
+    public enum AggregateFunc{
+        MAX, AVG
+    }
+
+    public enum DataType {
+        METRIC, ENTITY
+    }
+
+    public enum DataSource {
+        CLUSTER_METRIC, RUNNING_APPS, SCHEDULER
+    }
+
+    public static class MetricName{
+
+        // Metrics from running apps
+        public final static String HADOOP_APPS_ALLOCATED_MB = "hadoop.%s.allocatedmb";
+        public final static String HADOOP_APPS_ALLOCATED_VCORES = "hadoop.%s.allocatedvcores";
+        public final static String HADOOP_APPS_RUNNING_CONTAINERS = "hadoop.%s.runningcontainers";
+
+        // metrics from cluster metrics
+        public final static String HADOOP_CLUSTER_NUMPENDING_JOBS = "hadoop.cluster.numpendingjobs";
+        public final static String HADOOP_CLUSTER_ALLOCATED_MEMORY = "hadoop.cluster.allocatedmemory";
+        public final static String HADOOP_CLUSTER_TOTAL_MEMORY = "hadoop.cluster.totalmemory";
+        public final static String HADOOP_CLUSTER_AVAILABLE_MEMORY = "hadoop.cluster.availablememory";
+        public final static String HADOOP_CLUSTER_RESERVED_MEMORY = "hadoop.cluster.reservedmemory";
+
+        // metrics from scheduler info
+        public final static String HADOOP_CLUSTER_CAPACITY = "hadoop.cluster.capacity";
+        public final static String HADOOP_CLUSTER_USED_CAPACITY = "hadoop.cluster.usedcapacity";
+
+        public final static String HADOOP_QUEUE_NUMPENDING_JOBS = "hadoop.queue.numpendingjobs";
+        public final static String HADOOP_QUEUE_USED_CAPACITY = "hadoop.queue.usedcapacity";
+        public final static String HADOOP_QUEUE_USED_CAPACITY_RATIO = "hadoop.queue.usedcapacityratio";
+
+        public final static String HADOOP_USER_NUMPENDING_JOBS = "hadoop.user.numpendingjobs";
+        public final static String HADOOP_USER_USED_MEMORY = "hadoop.user.usedmemory";
+        public final static String HADOOP_USER_USED_MEMORY_RATIO = "hadoop.user.usedmemoryratio";
+
+    }
+
+    public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
+
+    // tag constants
+    public static final String TAG_PARENT_QUEUE = "parentQueue";
+    public static final String TAG_QUEUE = "queue";
+    public static final String TAG_USER = "user";
+    public static final String TAG_SITE = "site";
+    public static final String TAG_CLUSTER = "cluster";
+
+    // field constants
+    public static final String FIELD_DATATYPE = "dataType";
+    public static final String FIELD_DATA = "data";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
new file mode 100644
index 0000000..f2c4b1f
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopYarnResourceUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.common;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.InputStream;
+
+public class HadoopYarnResourceUtils {
+
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public static Object getObjectFromStreamWithGzip(String urlString, Class<?> clazz) throws Exception {
+        InputStream is = null;
+        Object o = null;
+        try {
+            is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
+            o = OBJ_MAPPER.readValue(is, clazz);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(String.format("Fetch resource %s failed", urlString), e);
+        } finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+        return o;
+    }
+
+    public static String getConfigValue(Config eagleConf, String key, String defaultValue) {
+        if (eagleConf.hasPath(key)) {
+            return eagleConf.getString(key);
+        } else {
+            return defaultValue;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
new file mode 100644
index 0000000..7fd275b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.common;
+
+public class YarnClusterResourceURLBuilder {
+
+    private final static String CLUSTER_SCHEDULER_API_URL = "ws/v1/cluster/scheduler";
+    private final static String CLUSTER_METRICS_API_URL = "ws/v1/cluster/metrics";
+    private final static String CLUSTER_APPS_API_URL = "ws/v1/cluster/apps";
+    private final static String ANONYMOUS_PARAMETER = "anonymous=true";
+
+    public static String buildSchedulerInfoURL(String urlBase) {
+        return urlBase + CLUSTER_SCHEDULER_API_URL + "?" + ANONYMOUS_PARAMETER;
+    }
+
+    public static String buildClusterMetricsURL(String urlBase) {
+        return urlBase + CLUSTER_METRICS_API_URL + "?" + ANONYMOUS_PARAMETER;
+    }
+
+    public static String buildRunningAppsURL(String urlBase) {
+        return urlBase + CLUSTER_APPS_API_URL + "?state=RUNNING" + "&" + ANONYMOUS_PARAMETER;
+    }
+
+    public static String buildFinishedAppsURL(String urlBase) {
+        return urlBase + CLUSTER_APPS_API_URL + "?state=FINISHED" + "&" + ANONYMOUS_PARAMETER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
new file mode 100644
index 0000000..05e3be9
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.common;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.ha.AbstractURLSelector;
+
+public class YarnURLSelectorImpl extends AbstractURLSelector {
+
+    public YarnURLSelectorImpl(String[] urls, Constants.CompressionType compressionType) {
+        super(urls, compressionType);
+    }
+
+    @Override
+    protected String buildTestURL(String urlToCheck) {
+        return YarnClusterResourceURLBuilder.buildRunningAppsURL(urlToCheck);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
new file mode 100644
index 0000000..5cf44dc
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsCrawler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
+import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetrics;
+import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetricsWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ClusterMetricsCrawler implements Runnable {
+
+	private final static Logger logger = LoggerFactory.getLogger(ClusterMetricsCrawler.class);
+	private ClusterMetricsParseListener listener;
+	private String urlString;
+
+	public ClusterMetricsCrawler(String site, String urlBase, SpoutOutputCollector collector) {
+		listener = new ClusterMetricsParseListener(site, collector);
+		urlString = YarnClusterResourceURLBuilder.buildClusterMetricsURL(urlBase);
+	}
+
+	@Override
+	public void run() {
+		try {
+			logger.info("Start to crawl cluster metrics from " + this.urlString);
+			ClusterMetricsWrapper metricsWrapper = (ClusterMetricsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, ClusterMetricsWrapper.class);
+			ClusterMetrics metrics = metricsWrapper.getClusterMetrics();
+			if(metrics == null) {
+				logger.error("Failed to crawl cluster metrics");
+			} else {
+				long currentTimestamp = System.currentTimeMillis();
+				listener.onMetric(metrics, currentTimestamp);
+			}
+		} catch (IOException e) {
+			logger.error(e.getMessage());
+			if(logger.isDebugEnabled()) {
+				logger.trace(e.getMessage(), e);
+			}
+		} catch (Exception e) {
+			logger.error(e.getMessage(), e);
+		} finally {
+			listener.flush();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
new file mode 100644
index 0000000..b8a5b45
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/**
+ *
+ */
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.hadoop.queue.model.clusterMetrics.ClusterMetrics;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.AggregateFunc;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class ClusterMetricsParseListener {
+
+	private String site;
+	private SpoutOutputCollector collector;
+
+	private long maxTimestamp;
+	private Map<MetricKey, GenericMetricEntity> clusterMetricEntities = new HashMap<>();
+	private Map<MetricKey, Integer> clusterMetricCounts = new HashMap<>();
+
+	private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+	private final static long HOLD_TIME_WINDOW = 2 * DateTimeUtil.ONEMINUTE;
+
+	public ClusterMetricsParseListener(String site, SpoutOutputCollector collector){
+		reset();
+		this.site = site;
+		this.collector = collector;
+	}
+
+	private void createMetric(String metricName, long timestamp, double value, HadoopClusterConstants.AggregateFunc aggFunc){
+		if (timestamp > maxTimestamp) {
+			maxTimestamp = timestamp;
+		}
+		timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
+		MetricKey key = new MetricKey(metricName, timestamp);
+		GenericMetricEntity entity = clusterMetricEntities.get(key);
+		if (entity == null) {
+			entity = new GenericMetricEntity();
+			entity.setTags(buildMetricTags());
+			entity.setTimestamp(timestamp);
+			entity.setPrefix(metricName);
+			entity.setValue(new double[]{0.0});
+			clusterMetricEntities.put(key, entity);
+		}
+		if (clusterMetricCounts.get(key) == null){
+			clusterMetricCounts.put(key, 0);
+		}
+		updateEntityAggValue(entity, aggFunc, value, clusterMetricCounts.get(key));
+		clusterMetricCounts.put(key, clusterMetricCounts.get(key) + 1);
+	}
+
+	public void onMetric(ClusterMetrics metrics, long currentTimestamp) {
+		createMetric(MetricName.HADOOP_CLUSTER_NUMPENDING_JOBS, currentTimestamp, metrics.getAppsPending(), AggregateFunc.MAX);
+		createMetric(MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY, currentTimestamp, metrics.getAllocatedMB(), AggregateFunc.AVG);
+		createMetric(MetricName.HADOOP_CLUSTER_TOTAL_MEMORY, currentTimestamp, metrics.getTotalMB(), AggregateFunc.MAX);
+		createMetric(MetricName.HADOOP_CLUSTER_AVAILABLE_MEMORY, currentTimestamp, metrics.getAvailableMB(), AggregateFunc.AVG);
+		createMetric(MetricName.HADOOP_CLUSTER_RESERVED_MEMORY, currentTimestamp, metrics.getReservedMB(), AggregateFunc.AVG);
+	}
+
+	public void flush() {
+		HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis());
+		List<GenericMetricEntity> metrics = new ArrayList<>(clusterMetricEntities.values());
+		this.collector.emit(new ValuesArray(DataType.METRIC.name(), metrics), messageId);
+		reset();
+	}
+
+	private void reset() {
+		maxTimestamp = 0;
+		clearOldCache();
+	}
+
+	private void clearOldCache() {
+		List<MetricKey> removedkeys = clusterMetricEntities.keySet().stream().filter(key -> key.createTime < maxTimestamp - HOLD_TIME_WINDOW).collect(Collectors.toList());
+
+		for (MetricKey key : removedkeys) {
+			clusterMetricEntities.remove(key);
+		}
+	}
+
+	private Map<String, String> buildMetricTags(){
+		Map<String,String> tags = new HashMap<String, String>();
+		tags.put(HadoopClusterConstants.TAG_SITE, site);
+		return tags;
+	}
+
+	private void updateEntityAggValue(GenericMetricEntity entity,
+									  HadoopClusterConstants.AggregateFunc aggFunc,
+									  double value,
+									  double count) {
+		double lastValue = entity.getValue()[0];
+		switch (aggFunc){
+			case MAX:
+				entity.setValue(new double[]{Math.max(lastValue, value)});
+				return;
+			case AVG:
+				long avgValue = (long) ((lastValue * count + value) / (count +1));
+				entity.setValue(new double[]{avgValue});
+				return;
+		}
+	}
+
+	private class MetricKey {
+		String metricName;
+		Long createTime;
+
+		public MetricKey(String metricName, Long timestamp) {
+			this.metricName = metricName;
+			this.createTime = timestamp;
+		}
+
+		public boolean equals(Object obj) {
+			if (obj instanceof MetricKey) {
+				MetricKey key = (MetricKey) obj;
+				if (key == null) {
+					return false;
+				}
+				return Objects.equals(metricName, key.metricName) & Objects.equals(createTime, key.createTime);
+			}
+			return false;
+		}
+
+		public int hashCode() {
+			return new HashCodeBuilder().append(metricName).append(createTime).toHashCode();
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
new file mode 100755
index 0000000..be06e12
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/**
+ *
+ */
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.hadoop.queue.model.applications.App;
+import org.apache.eagle.hadoop.queue.model.applications.Apps;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.*;
+
+public class RunningAppParseListener {
+
+	private final static Logger logger = LoggerFactory.getLogger(RunningAppParseListener.class);
+	private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+
+	@SuppressWarnings("serial")
+	public static HashMap<String, String> metrics = new HashMap<String, String>() {
+		{
+			put(MetricName.HADOOP_APPS_ALLOCATED_MB, "getAllocatedMB");
+		 	put(MetricName.HADOOP_APPS_ALLOCATED_VCORES, "getAllocatedVCores");
+		 	put(MetricName.HADOOP_APPS_RUNNING_CONTAINERS, "getRunningContainers");
+		}
+	};
+
+	private String site;
+	private SpoutOutputCollector collector;
+	private Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>();
+
+	public RunningAppParseListener(String site, SpoutOutputCollector collector){
+		this.site = site;
+		this.collector = collector;
+	}
+
+	public void flush() {
+		logger.info("start sending app metrics, size: " + appMetricEntities.size());
+		HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.RUNNING_APPS, System.currentTimeMillis());
+		List<GenericMetricEntity> metrics = new ArrayList<>(appMetricEntities.values());
+		collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+		appMetricEntities.clear();
+	}
+
+	private Map<String, String> buildMetricTags(AggLevel level, Map<String, String> tags){
+		Map<String, String> newTags = new HashMap<String, String>();
+		newTags.put(HadoopClusterConstants.TAG_SITE, site);
+		tags.entrySet().stream().filter(entry -> level.level.equalsIgnoreCase(entry.getKey())).forEach(entry -> {
+			newTags.put(entry.getKey(), entry.getValue());
+		});
+		return newTags;
+	}
+
+	private void createMetric(String metricName, Map<String, String> tags, long timestamp, int value) {
+		String key = metricName + tags.toString() + " " + timestamp;
+		GenericMetricEntity entity = appMetricEntities.get(key);
+		if (entity == null) {
+			entity = new GenericMetricEntity();
+			entity.setTags(tags);
+			entity.setTimestamp(timestamp);
+			entity.setPrefix(metricName);
+			entity.setValue(new double[]{0.0});
+			appMetricEntities.put(key, entity);
+		}
+		double lastValue = entity.getValue()[0];
+		entity.setValue(new double[]{lastValue + value});
+	}
+
+	public void onMetric(Apps apps, long timestamp) throws Exception {
+		timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
+		for (App app : apps.getApp()) {
+			Map<String, String> tags = new HashMap<>();
+			tags.put(HadoopClusterConstants.TAG_USER, app.getUser());
+			tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue());
+			for (AggLevel level : AggLevel.values()) {
+				Map<String, String> newTags = buildMetricTags(level, tags);
+				for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
+					Method method = App.class.getMethod(entry.getValue());
+					Integer value = (Integer) method.invoke(app);
+					String metricName = String.format(entry.getKey(), level.name);
+					createMetric(metricName, newTags, timestamp, value);
+				}
+			}
+		}
+	}
+
+    private enum AggLevel {
+        CLUSTER(HadoopClusterConstants.TAG_CLUSTER, ""),
+		QUEUE(HadoopClusterConstants.TAG_QUEUE, HadoopClusterConstants.TAG_QUEUE),
+		USER(HadoopClusterConstants.TAG_USER, HadoopClusterConstants.TAG_USER);
+
+        private String name;
+        private String level;
+		AggLevel(String name, String level) {
+            this.name = name;
+            this.level = level;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
new file mode 100755
index 0000000..55ba41b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
+import org.apache.eagle.hadoop.queue.model.applications.Apps;
+import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import org.apache.eagle.hadoop.queue.model.applications.AppsWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RunningAppsCrawler implements Runnable {
+
+	private final static Logger logger = LoggerFactory.getLogger(RunningAppsCrawler.class);
+
+	private RunningAppParseListener listener;
+	private String urlString;
+
+	public RunningAppsCrawler(String site, String baseUrl, SpoutOutputCollector collector){
+		this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl);
+		//this.urlString = YarnClusterResourceURLBuilder.buildFinishedAppsURL(baseUrl);
+		listener = new RunningAppParseListener(site, collector);
+	}
+
+	@Override
+	public void run() {
+		try {
+			logger.info("Start to crawl app metrics from " + this.urlString);
+			AppsWrapper appsWrapper = (AppsWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, AppsWrapper.class);
+			if(appsWrapper == null || appsWrapper.getApps() == null) {
+				logger.error("Failed to crawl running applications with api = " + urlString);
+			} else {
+				long currentTimestamp = System.currentTimeMillis();
+				listener.onMetric(appsWrapper.getApps(), currentTimestamp);
+			}
+		} catch (IOException e) {
+			logger.error(e.getMessage());
+			if(logger.isDebugEnabled()) {
+				logger.trace(e.getMessage(), e);
+			}
+		} catch (Exception e) {
+			logger.error(e.getMessage(), e);
+		} finally {
+			listener.flush();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
new file mode 100755
index 0000000..f25fa20
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoCrawler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
+import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SchedulerInfoCrawler implements Runnable {
+
+	private final static Logger logger = LoggerFactory.getLogger(SchedulerInfoCrawler.class);
+
+	private SchedulerInfoParseListener listener;
+    private String urlString;
+
+	public SchedulerInfoCrawler(String site, String baseUrl, SpoutOutputCollector collector) {
+		this.urlString = YarnClusterResourceURLBuilder.buildSchedulerInfoURL(baseUrl);
+		this.listener = new SchedulerInfoParseListener(site, collector);
+	}
+
+	@Override
+	public void run() {
+		try {
+			//https://apollo-phx-rm-2.vip.ebay.com:50030/ws/v1/cluster/scheduler?anonymous=true
+			logger.info("Start to crawl cluster scheduler queues from " + this.urlString);
+			SchedulerWrapper schedulerWrapper = (SchedulerWrapper) HadoopYarnResourceUtils.getObjectFromStreamWithGzip(urlString, SchedulerWrapper.class);
+			if (schedulerWrapper == null || schedulerWrapper.getScheduler() == null) {
+				logger.error("Failed to crawl scheduler info with url = " + this.urlString);
+			} else {
+				SchedulerInfo scheduler = schedulerWrapper.getScheduler().getSchedulerInfo();
+				logger.info("Crawled " + scheduler.getQueues().getQueue().size() + " queues");
+				long currentTimestamp = System.currentTimeMillis();
+				listener.onMetric(scheduler, currentTimestamp);
+			}
+		} catch (IOException e) {
+			logger.error("Got IO exception while connecting to "+this.urlString + " : "+ e.getMessage());
+		} catch (Exception e) {
+			logger.error("Got exception while crawling queues:" + e.getMessage(), e);
+		} catch (Throwable e) {
+			logger.error("Got throwable exception while crawling queues:" + e.getMessage(), e);
+		} finally {
+			listener.flush();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
new file mode 100644
index 0000000..99d83d4
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
+import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+public class SchedulerInfoParseListener {
+
+    private final static Logger LOG = LoggerFactory.getLogger(SchedulerInfoParseListener.class);
+    //private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+    //private int MAX_CACHE_COUNT = 1000;
+
+    private final List<RunningQueueAPIEntity> runningQueueAPIEntities = new ArrayList<>();
+    private final List<GenericMetricEntity> metricEntities = new ArrayList<>();
+
+    private String site;
+    private SpoutOutputCollector collector;
+
+    public SchedulerInfoParseListener(String site, SpoutOutputCollector collector) {
+        this.site = site;
+        this.collector = collector;
+    }
+
+    public void onMetric(SchedulerInfo scheduler, long currentTimestamp) throws Exception {
+        Map<String,String> tags = buildMetricTags(null, null);
+        createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity());
+        createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity());
+        for(Queue queue : scheduler.getQueues().getQueue()) {
+            createQueues(queue, currentTimestamp, scheduler, null);
+        }
+    }
+
+    public void flush() {
+        LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size());
+        HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
+        List<GenericMetricEntity> metrics = new ArrayList<>(metricEntities);
+        collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+
+        LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size());
+        messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
+        List<RunningQueueAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
+        collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId);
+
+        runningQueueAPIEntities.clear();
+        metricEntities.clear();
+    }
+
+    private Map<String, String> buildMetricTags(String queueName, String parentQueueName) {
+        Map<String,String> tags = new HashMap<>();
+        tags.put(HadoopClusterConstants.TAG_SITE, this.site);
+        if (queueName != null) {
+            tags.put(HadoopClusterConstants.TAG_QUEUE, queueName);
+        }
+        if (parentQueueName != null) {
+            tags.put(HadoopClusterConstants.TAG_PARENT_QUEUE, parentQueueName);
+        }
+        return tags;
+    }
+
+    private void createMetric(String metricName, Map<String,String> tags,long timestamp, double value) throws Exception {
+        GenericMetricEntity e = new GenericMetricEntity();
+        e.setPrefix(metricName);
+        e.setTimestamp(timestamp);
+        e.setTags(tags);
+        e.setValue(new double[]{value});
+        this.metricEntities.add(e);
+    }
+
+    private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception {
+        RunningQueueAPIEntity _entity = new RunningQueueAPIEntity();
+        Map<String, String> _tags = buildMetricTags(queue.getQueueName(), parentQueueName);
+        _entity.setTags(_tags);
+        _entity.setState(queue.getState());
+        _entity.setScheduler(scheduler.getType());
+        _entity.setAbsoluteCapacity(queue.getAbsoluteCapacity());
+        _entity.setAbsoluteMaxCapacity(queue.getAbsoluteMaxCapacity());
+        _entity.setAbsoluteUsedCapacity(queue.getAbsoluteUsedCapacity());
+        _entity.setMemory(queue.getResourcesUsed().getMemory());
+        _entity.setVcores(queue.getResourcesUsed().getvCores());
+        _entity.setNumActiveApplications(queue.getNumApplications());
+        _entity.setNumPendingApplications(queue.getNumPendingApplications());
+        _entity.setMaxActiveApplications(queue.getMaxActiveApplications());
+        _entity.setTimestamp(currentTimestamp);
+
+        List<UserWrapper> userList = new ArrayList<>();
+        if (queue.getUsers() != null && queue.getUsers().getUser() != null )  {
+            for (User user : queue.getUsers().getUser()) {
+                UserWrapper newUser = new UserWrapper(user);
+                userList.add(newUser);
+            }
+        }
+        _entity.setUsers(userList);
+
+        runningQueueAPIEntities.add(_entity);
+
+        createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, currentTimestamp, queue.getNumPendingApplications());
+        createMetric(MetricName.HADOOP_QUEUE_USED_CAPACITY,_tags,currentTimestamp,queue.getAbsoluteUsedCapacity());
+        if(queue.getAbsoluteCapacity() == 0) {
+            createMetric(MetricName.HADOOP_QUEUE_USED_CAPACITY_RATIO,_tags,currentTimestamp,0);
+        } else {
+            createMetric(MetricName.HADOOP_QUEUE_USED_CAPACITY_RATIO,_tags,currentTimestamp,queue.getAbsoluteUsedCapacity()/queue.getAbsoluteCapacity());
+        }
+
+        if (queue.getUsers() != null && queue.getUsers().getUser() != null )  {
+            for (User user : queue.getUsers().getUser()) {
+                Map<String, String> userTags = new HashMap<>(_tags);
+                userTags.put(HadoopClusterConstants.TAG_USER, user.getUsername());
+                createMetric(HadoopClusterConstants.MetricName.HADOOP_USER_NUMPENDING_JOBS,userTags,currentTimestamp,user.getNumPendingApplications());
+                createMetric(HadoopClusterConstants.MetricName.HADOOP_USER_USED_MEMORY,userTags,currentTimestamp,user.getResourcesUsed().getMemory());
+                createMetric(HadoopClusterConstants.MetricName.HADOOP_USER_USED_MEMORY_RATIO,userTags,currentTimestamp,((double)user.getResourcesUsed().getMemory()) / queue.getResourcesUsed().getMemory());
+            }
+        }
+
+        if (queue.getQueues() != null && queue.getQueues().getQueue() != null) {
+            for (Queue subQueue : queue.getQueues().getQueue()) {
+                createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java
new file mode 100644
index 0000000..0bce87d
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/exceptions/HadoopQueueFetcherException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.exceptions;
+
+public class HadoopQueueFetcherException extends Exception {
+
+	private static final long serialVersionUID = -2425311876734366496L;
+
+	/**
+	 * Default constructor of FeederException
+	 */
+    public HadoopQueueFetcherException() {
+        super();
+    }
+
+    /**
+     * Constructor of FeederException
+     *
+     * @param message error message
+     */
+    public HadoopQueueFetcherException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructor of FeederException
+     *
+     * @param message error message
+     * @param cause the cause of the exception
+     *
+     */
+    public HadoopQueueFetcherException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructor of FeederException
+     *
+     * @param cause the cause of the exception
+     */
+    public HadoopQueueFetcherException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
new file mode 100644
index 0000000..728c81e
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.hadoop.queue.model;
+
+import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class HadoopQueueEntityRepository extends EntityRepository {
+	public HadoopQueueEntityRepository() {
+		this.registerEntity(RunningQueueAPIEntity.class);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
new file mode 100755
index 0000000..e0a2b61
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/**
+ * 
+ */
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/*
+ *	App model for Yarn Resource http://<rm http address:port>/ws/v1/cluster/apps
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class App {
+	private String id;
+	private String user;
+	private String name;
+	private String queue;
+	private String state;
+	private String finalStatus;
+	private double progress;
+	private String trackingUI;
+	private String trackingUrl;
+	private String diagnostics;
+	private long clusterId;
+	private String applicationType;
+	private String applicationTags;
+	private long startedTime;
+	private long finishedTime;
+	private long elapsedTime;
+	private String amContainerLogs;
+	private String amHostHttpAddress;
+	private int allocatedMB;
+	private int allocatedVCores;
+	private int runningContainers;
+
+	public String getId() {
+		return id;
+	}
+	
+	public void setId(String id) {
+		this.id = id;
+	}
+	
+	public String getUser() {
+		return user;
+	}
+
+	public void setUser(String user) {
+		this.user = user;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	public String getQueue() {
+		return queue;
+	}
+
+	public void setQueue(String queue) {
+		this.queue = queue;
+	}
+
+	public String getState() {
+		return state;
+	}
+
+	public void setState(String state) {
+		this.state = state;
+	}
+
+	public String getFinalStatus() {
+		return finalStatus;
+	}
+
+	public void setFinalStatus(String finalStatus) {
+		this.finalStatus = finalStatus;
+	}
+
+	public double getProgress() {
+		return progress;
+	}
+
+	public void setProgress(double progress) {
+		this.progress = progress;
+	}
+
+	public String getTrackingUI() {
+		return trackingUI;
+	}
+
+	public void setTrackingUI(String trackingUI) {
+		this.trackingUI = trackingUI;
+	}
+
+	public String getTrackingUrl() {
+		return trackingUrl;
+	}
+
+	public void setTrackingUrl(String trackingUrl) {
+		this.trackingUrl = trackingUrl;
+	}
+
+	public String getDiagnostics() {
+		return diagnostics;
+	}
+
+	public void setDiagnostics(String diagnostics) {
+		this.diagnostics = diagnostics;
+	}
+
+	public long getClusterId() {
+		return clusterId;
+	}
+
+	public void setClusterId(long clusterId) {
+		this.clusterId = clusterId;
+	}
+
+	public long getStartedTime() {
+		return startedTime;
+	}
+
+	public void setStartedTime(long startedTime) {
+		this.startedTime = startedTime;
+	}
+
+	public long getFinishedTime() {
+		return finishedTime;
+	}
+
+	public void setFinishedTime(long finishedTime) {
+		this.finishedTime = finishedTime;
+	}
+
+	public long getElapsedTime() {
+		return elapsedTime;
+	}
+
+	public void setElapsedTime(long elapsedTime) {
+		this.elapsedTime = elapsedTime;
+	}
+	
+	public String getApplicationType() {
+		return applicationType;
+	}
+
+	public void setApplicationType(String applicationType) {
+		this.applicationType = applicationType;
+	}
+
+	public String getApplicationTags() {
+		return applicationTags;
+	}
+
+	public void setApplicationTags(String applicationTags) {
+		this.applicationTags = applicationTags;
+	}
+
+	public String getAmContainerLogs() {
+		return amContainerLogs;
+	}
+
+	public void setAmContainerLogs(String amContainerLogs) {
+		this.amContainerLogs = amContainerLogs;
+	}
+
+	public String getAmHostHttpAddress() {
+		return amHostHttpAddress;
+	}
+
+	public void setAmHostHttpAddress(String amHostHttpAddress) {
+		this.amHostHttpAddress = amHostHttpAddress;
+	}
+
+	public int getAllocatedMB() {
+		return allocatedMB;
+	}
+
+	public void setAllocatedMB(int allocatedMB) {
+		this.allocatedMB = allocatedMB;
+	}
+
+	public int getAllocatedVCores() {
+		return allocatedVCores;
+	}
+
+	public void setAllocatedVCores(int allocatedVCores) {
+		this.allocatedVCores = allocatedVCores;
+	}
+	
+	public int getRunningContainers() {
+		return runningContainers;
+	}
+
+	public void setRunningContainers(int runningContainers) {
+		this.runningContainers = runningContainers;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
new file mode 100755
index 0000000..649f17b
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/**
+ * 
+ */
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Apps {
+	private List<App> app;
+
+	public List<App> getApp() {
+		return app;
+	}
+
+	public void setApp(List<App> app) {
+		this.app = app;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
new file mode 100755
index 0000000..c204638
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+	private Apps apps;
+
+	public Apps getApps() {
+		return apps;
+	}
+
+	public void setApps(Apps apps) {
+		this.apps = apps;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java
new file mode 100644
index 0000000..b254ebf
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetrics.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.clusterMetrics;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterMetrics {
+	private int appsSubmitted;
+	private int appsCompleted;
+	private int appsPending;
+	private int appsRunning;
+	private int appsFailed;
+	private int appsKilled;
+	private long reservedMB;
+	private long availableMB;
+	private long allocatedMB;
+	private int containersAllocated;
+	private int containersReserved;
+	private int containersPending;
+	private long totalMB;
+	private int totalNodes;
+	private int lostNodes;
+	private int unhealthyNodes;
+	private int decommissionedNodes;
+	private int rebootedNodes;
+	private int activeNodes;
+
+	public int getAppsSubmitted() {
+		return appsSubmitted;
+	}
+
+	public void setAppsSubmitted(int appsSubmitted) {
+		this.appsSubmitted = appsSubmitted;
+	}
+
+	public int getAppsCompleted() {
+		return appsCompleted;
+	}
+
+	public void setAppsCompleted(int appsCompleted) {
+		this.appsCompleted = appsCompleted;
+	}
+
+	public int getAppsPending() {
+		return appsPending;
+	}
+
+	public void setAppsPending(int appsPending) {
+		this.appsPending = appsPending;
+	}
+
+	public int getAppsRunning() {
+		return appsRunning;
+	}
+
+	public void setAppsRunning(int appsRunning) {
+		this.appsRunning = appsRunning;
+	}
+
+	public int getAppsFailed() {
+		return appsFailed;
+	}
+
+	public void setAppsFailed(int appsFailed) {
+		this.appsFailed = appsFailed;
+	}
+
+	public int getAppsKilled() {
+		return appsKilled;
+	}
+
+	public void setAppsKilled(int appsKilled) {
+		this.appsKilled = appsKilled;
+	}
+
+	public long getReservedMB() {
+		return reservedMB;
+	}
+
+	public void setReservedMB(long reservedMB) {
+		this.reservedMB = reservedMB;
+	}
+
+	public long getAvailableMB() {
+		return availableMB;
+	}
+
+	public void setAvailableMB(long availableMB) {
+		this.availableMB = availableMB;
+	}
+
+	public long getAllocatedMB() {
+		return allocatedMB;
+	}
+
+	public void setAllocatedMB(long allocatedMB) {
+		this.allocatedMB = allocatedMB;
+	}
+
+	public int getContainersAllocated() {
+		return containersAllocated;
+	}
+
+	public void setContainersAllocated(int containersAllocated) {
+		this.containersAllocated = containersAllocated;
+	}
+
+	public int getContainersReserved() {
+		return containersReserved;
+	}
+
+	public void setContainersReserved(int containersReserved) {
+		this.containersReserved = containersReserved;
+	}
+
+	public int getContainersPending() {
+		return containersPending;
+	}
+
+	public void setContainersPending(int containersPending) {
+		this.containersPending = containersPending;
+	}
+
+	public long getTotalMB() {
+		return totalMB;
+	}
+
+	public void setTotalMB(long totalMB) {
+		this.totalMB = totalMB;
+	}
+
+	public int getTotalNodes() {
+		return totalNodes;
+	}
+
+	public void setTotalNodes(int totalNodes) {
+		this.totalNodes = totalNodes;
+	}
+
+	public int getLostNodes() {
+		return lostNodes;
+	}
+
+	public void setLostNodes(int lostNodes) {
+		this.lostNodes = lostNodes;
+	}
+
+	public int getUnhealthyNodes() {
+		return unhealthyNodes;
+	}
+
+	public void setUnhealthyNodes(int unhealthyNodes) {
+		this.unhealthyNodes = unhealthyNodes;
+	}
+
+	public int getDecommissionedNodes() {
+		return decommissionedNodes;
+	}
+
+	public void setDecommissionedNodes(int decommissionedNodes) {
+		this.decommissionedNodes = decommissionedNodes;
+	}
+
+	public int getRebootedNodes() {
+		return rebootedNodes;
+	}
+
+	public void setRebootedNodes(int rebootedNodes) {
+		this.rebootedNodes = rebootedNodes;
+	}
+
+	public int getActiveNodes() {
+		return activeNodes;
+	}
+
+	public void setActiveNodes(int activeNodes) {
+		this.activeNodes = activeNodes;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java
new file mode 100644
index 0000000..a9d1a9a
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/clusterMetrics/ClusterMetricsWrapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.clusterMetrics;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusterMetricsWrapper {
+	public ClusterMetrics getClusterMetrics() {
+		return clusterMetrics;
+	}
+
+	public void setClusterMetrics(ClusterMetrics clusterMetrics) {
+		this.clusterMetrics = clusterMetrics;
+	}
+
+	private ClusterMetrics clusterMetrics;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f6fea4a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java
new file mode 100644
index 0000000..ace5879
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/Queue.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Queue {
+	private String type;
+	private double capacity;
+	private double usedCapacity;
+	private double maxCapacity;
+	private double absoluteCapacity;
+	private double absoluteMaxCapacity;
+	private double absoluteUsedCapacity;
+
+	private ResourcesUsed resourcesUsed;
+	private String usedResources;
+	private String queueName;
+	private String state;
+	private Users users;
+
+	private int numApplications;
+	private int numPendingApplications;
+	private int numContainers;
+	private int maxApplications;
+	private int maxApplicationsPerUser;
+	private int maxActiveApplications;
+	private int maxActiveApplicationsPerUser;
+	private int userLimit;
+	private int userLimitFactor;
+	private Queues queues;
+
+	public String getUsedResources() {
+		return usedResources;
+	}
+
+	public void setUsedResources(String usedResources) {
+		this.usedResources = usedResources;
+	}
+
+	public int getMaxActiveApplicationsPerUser() {
+		return maxActiveApplicationsPerUser;
+	}
+
+	public void setMaxActiveApplicationsPerUser(int maxActiveApplicationsPerUser) {
+		this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
+	}
+
+	public int getNumPendingApplications() {
+		return numPendingApplications;
+	}
+
+	public void setNumPendingApplications(int numPendingApplications) {
+		this.numPendingApplications = numPendingApplications;
+	}
+
+	public int getNumContainers() {
+		return numContainers;
+	}
+
+	public void setNumContainers(int numContainers) {
+		this.numContainers = numContainers;
+	}
+
+	public int getMaxApplications() {
+		return maxApplications;
+	}
+
+	public void setMaxApplications(int maxApplications) {
+		this.maxApplications = maxApplications;
+	}
+
+	public int getMaxApplicationsPerUser() {
+		return maxApplicationsPerUser;
+	}
+
+	public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
+		this.maxApplicationsPerUser = maxApplicationsPerUser;
+	}
+
+	public int getMaxActiveApplications() {
+		return maxActiveApplications;
+	}
+
+	public void setMaxActiveApplications(int maxActiveApplications) {
+		this.maxActiveApplications = maxActiveApplications;
+	}
+
+	public int getUserLimit() {
+		return userLimit;
+	}
+
+	public void setUserLimit(int userLimit) {
+		this.userLimit = userLimit;
+	}
+
+	public int getUserLimitFactor() {
+		return userLimitFactor;
+	}
+
+	public void setUserLimitFactor(int userLimitFactor) {
+		this.userLimitFactor = userLimitFactor;
+	}
+
+
+	public String getType() {
+		return type;
+	}
+
+	public void setType(String type) {
+		this.type = type;
+	}
+	public ResourcesUsed getResourcesUsed() {
+		return resourcesUsed;
+	}
+
+	public void setResourcesUsed(ResourcesUsed resourcesUsed) {
+		this.resourcesUsed = resourcesUsed;
+	}
+
+
+	public Users getUsers() { return users; }
+
+	public void setUsers(Users users) { this.users = users; }
+
+	public double getAbsoluteUsedCapacity() {
+		return absoluteUsedCapacity;
+	}
+
+	public void setAbsoluteUsedCapacity(double absoluteUsedCapacity) {
+		this.absoluteUsedCapacity = absoluteUsedCapacity;
+	}
+
+	public double getCapacity() {
+		return capacity;
+	}
+
+	public void setCapacity(double capacity) {
+		this.capacity = capacity;
+	}
+
+	public double getUsedCapacity() {
+		return usedCapacity;
+	}
+
+	public void setUsedCapacity(double usedCapacity) {
+		this.usedCapacity = usedCapacity;
+	}
+
+	public double getMaxCapacity() {
+		return maxCapacity;
+	}
+
+	public void setMaxCapacity(double maxCapacity) {
+		this.maxCapacity = maxCapacity;
+	}
+
+	public double getAbsoluteCapacity() {
+		return absoluteCapacity;
+	}
+
+	public void setAbsoluteCapacity(double absoluteCapacity) {
+		this.absoluteCapacity = absoluteCapacity;
+	}
+
+	public double getAbsoluteMaxCapacity() {
+		return absoluteMaxCapacity;
+	}
+
+	public void setAbsoluteMaxCapacity(double absoluteMaxCapacity) {
+		this.absoluteMaxCapacity = absoluteMaxCapacity;
+	}
+
+	public int getNumApplications() {
+		return numApplications;
+	}
+
+	public void setNumApplications(int numApplications) {
+		this.numApplications = numApplications;
+	}
+
+	public String getQueueName() {
+		return queueName;
+	}
+
+	public void setQueueName(String queueName) {
+		this.queueName = queueName;
+	}
+
+	public String getState() {
+		return state;
+	}
+
+	public void setState(String state) {
+		this.state = state;
+	}
+
+	public Queues getQueues() {
+		return queues;
+	}
+
+	public void setQueues(Queues queues) {
+		this.queues = queues;
+	}	
+}