You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/02/21 06:50:44 UTC

eagle git commit: [EAGLE-915] Fetch accepted MR jobs to assist queue analysis

Repository: eagle
Updated Branches:
  refs/heads/master a27289fd3 -> 4a5c4a43c


[EAGLE-915] Fetch accepted MR jobs to assist queue analysis

https://issues.apache.org/jira/browse/EAGLE-915

Author: Zhao, Qingwen <qi...@apache.org>

Closes #826 from qingwen220/EAGLE-915.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/4a5c4a43
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/4a5c4a43
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/4a5c4a43

Branch: refs/heads/master
Commit: 4a5c4a43c1c38bea07867306028e5f3a4b856552
Parents: a27289f
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Feb 21 14:50:33 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Feb 21 14:50:33 2017 +0800

----------------------------------------------------------------------
 .../hadoop/queue/HadoopQueueRunningApp.java     |  29 +++--
 .../queue/common/HadoopClusterConstants.java    |  24 +---
 .../common/YarnClusterResourceURLBuilder.java   |  14 ++-
 .../crawler/ClusterMetricsParseListener.java    |   6 +-
 .../queue/crawler/RunningAppParseListener.java  |  66 ++++++++---
 .../queue/crawler/RunningAppsCrawler.java       |   5 +-
 .../crawler/SchedulerInfoParseListener.java     |   8 +-
 .../model/HadoopQueueEntityRepository.java      |   2 +
 .../hadoop/queue/model/applications/App.java    |  19 ++++
 .../queue/model/applications/AppStreamInfo.java |  53 +++++++++
 .../model/applications/YarnAppAPIEntity.java    | 111 +++++++++++++++++++
 .../queue/model/scheduler/QueueStreamInfo.java  |  79 +++++++++++++
 .../storm/HadoopQueueMetricPersistBolt.java     | 110 +++++++-----------
 .../queue/storm/HadoopQueueRunningSpout.java    |   4 +-
 ...doop.queue.HadoopQueueRunningAppProvider.xml |  63 ++++++++++-
 .../resourcefetch/ha/AbstractURLSelector.java   |   2 +-
 16 files changed, 472 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
index 68ca8c7..4708baa 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -17,37 +17,52 @@
 package org.apache.eagle.hadoop.queue;
 
 import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.messaging.StormStreamSink;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class HadoopQueueRunningApp extends StormApplication {
     public StormTopology execute(Config config, StormEnvironment environment) {
         HadoopQueueRunningAppConfig appConfig = new HadoopQueueRunningAppConfig(config);
 
+        String spoutName = "runningQueueSpout";
+        String persistBoltName = "persistBolt";
+
         IRichSpout spout = new HadoopQueueRunningSpout(appConfig);
-        HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig);
-        TopologyBuilder builder = new TopologyBuilder();
+        Map<HadoopClusterConstants.DataSource, String> streamMaps = new HashMap<>();
+
+        String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString();
+        String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString();
+        streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId);
+        streamMaps.put(DataSource.SCHEDULER, schedulerStreamId);
 
         int numOfPersistTasks = appConfig.topology.numPersistTasks;
         int numOfSinkTasks = appConfig.topology.numSinkTasks;
         int numOfSpoutTasks = 1;
 
-        String spoutName = "runningQueueSpout";
-        String persistBoltName = "persistBolt";
+        HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps);
+        TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
         builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName);
 
-        StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config);
+        StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_QUEUE_STREAM", config);
         builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks)
-                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName);
+                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, schedulerStreamId);
+
+        StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config);
+        builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks)
+                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId);
 
         return builder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 1d64f87..159da21 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -31,6 +31,10 @@ public class HadoopClusterConstants {
         CLUSTER_METRIC, RUNNING_APPS, SCHEDULER
     }
 
+    public enum AppState {
+        RUNNING, ACCEPTED
+    }
+
     public static class MetricName {
 
         // Metrics from running apps
@@ -61,26 +65,9 @@ public class HadoopClusterConstants {
 
     }
 
-    public static class LeafQueueInfo {
-        public static final String TIMESTAMP = "timestamp";
-        public static final String QUEUE_SITE = "site";
-        public static final String QUEUE_NAME = "queue";
-        public static final String QUEUE_STATE = "state";
-        public static final String QUEUE_SCHEDULER = "scheduler";
-        public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity";
-        public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity";
-        public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity";
-        public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity";
-        public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity";
-        public static final String QUEUE_USED_MEMORY = "memory";
-        public static final String QUEUE_USED_VCORES = "vcores";
-        public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications";
-        public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications";
-        public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications";
-    }
-
     public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
     public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService";
+    public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService";
 
     // tag constants
     public static final String TAG_PARENT_QUEUE = "parentQueue";
@@ -90,6 +77,7 @@ public class HadoopClusterConstants {
     public static final String TAG_CLUSTER = "cluster";
 
     // field constants
+    public static final String FIELD_DATASOURCE = "dataSource";
     public static final String FIELD_DATATYPE = "dataType";
     public static final String FIELD_DATA = "data";
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
index 0ee4318..7ec24df 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
@@ -36,7 +36,19 @@ public class YarnClusterResourceURLBuilder {
     }
 
     public static String buildRunningAppsURL(String urlBase) {
-        return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING" + "&" + ANONYMOUS_PARAMETER);
+        return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING&" + ANONYMOUS_PARAMETER);
+    }
+
+    public static String buildAcceptedAndRunningAppsURL(String urlBase) {
+        return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED,RUNNING&" + ANONYMOUS_PARAMETER);
+    }
+
+    public static String buildAcceptedAppsURL(String urlBase) {
+        return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED&" + ANONYMOUS_PARAMETER);
+    }
+
+    public static String buildAcceptedAppTrackingURL(String urlBase, String appId) {
+        return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "/" + appId);
     }
 
     public static String buildFinishedAppsURL(String urlBase) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
index d3219ef..57dd454 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
@@ -70,9 +70,7 @@ public class ClusterMetricsParseListener {
             entity.setValue(new double[] {0.0});
             clusterMetricEntities.put(key, entity);
         }
-        if (clusterMetricCounts.get(key) == null) {
-            clusterMetricCounts.put(key, 0);
-        }
+        clusterMetricCounts.putIfAbsent(key, 0);
         updateEntityAggValue(entity, aggFunc, value, clusterMetricCounts.get(key));
         clusterMetricCounts.put(key, clusterMetricCounts.get(key) + 1);
     }
@@ -89,7 +87,7 @@ public class ClusterMetricsParseListener {
     public void flush() {
         HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis());
         List<GenericMetricEntity> metrics = new ArrayList<>(clusterMetricEntities.values());
-        this.collector.emit(new ValuesArray(DataType.METRIC.name(), metrics), messageId);
+        this.collector.emit(new ValuesArray(DataSource.CLUSTER_METRIC, DataType.METRIC, metrics), messageId);
         reset();
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
index 364a1a7..ff54ca3 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
@@ -25,8 +25,13 @@ import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
 import org.apache.eagle.hadoop.queue.model.applications.App;
+import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo;
 import org.apache.eagle.hadoop.queue.model.applications.Apps;
+import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import backtype.storm.spout.SpoutOutputCollector;
@@ -54,19 +59,29 @@ public class RunningAppParseListener {
     };
 
     private String site;
+    private String rmUrl;
     private SpoutOutputCollector collector;
     private Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>();
+    private List<YarnAppAPIEntity> acceptedApps = new ArrayList<>();
 
-    public RunningAppParseListener(String site, SpoutOutputCollector collector) {
+    public RunningAppParseListener(String site, SpoutOutputCollector collector, String rmUrl) {
         this.site = site;
+        this.rmUrl = rmUrl;
         this.collector = collector;
     }
 
     public void flush() {
-        logger.info("start sending app metrics, size: " + appMetricEntities.size());
-        HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.RUNNING_APPS, System.currentTimeMillis());
+        logger.info("crawled {} running app metrics", appMetricEntities.size());
+        HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.RUNNING_APPS, System.currentTimeMillis());
         List<GenericMetricEntity> metrics = new ArrayList<>(appMetricEntities.values());
-        collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+        collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC, metrics), messageId);
+
+        logger.info("crawled {} accepted apps", acceptedApps.size());
+        messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.RUNNING_APPS, System.currentTimeMillis());
+        List<YarnAppAPIEntity> entities = new ArrayList<>(acceptedApps);
+        collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.ENTITY, entities), messageId);
+
+        acceptedApps.clear();
         appMetricEntities.clear();
     }
 
@@ -97,21 +112,44 @@ public class RunningAppParseListener {
     public void onMetric(Apps apps, long timestamp) throws Exception {
         timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
         for (App app : apps.getApp()) {
-            Map<String, String> tags = new HashMap<>();
-            tags.put(HadoopClusterConstants.TAG_USER, app.getUser());
-            tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue());
-            for (AggLevel level : AggLevel.values()) {
-                Map<String, String> newTags = buildMetricTags(level, tags);
-                for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
-                    Method method = App.class.getMethod(entry.getValue());
-                    Integer value = (Integer) method.invoke(app);
-                    String metricName = String.format(entry.getKey(), level.name);
-                    createMetric(metricName, newTags, timestamp, value);
+            if (app.getState().equalsIgnoreCase(HadoopClusterConstants.AppState.ACCEPTED.toString())) {
+                YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity();
+                appAPIEntity.setTags(buildAppTags(app));
+                appAPIEntity.setTrackingUrl(YarnClusterResourceURLBuilder.buildAcceptedAppTrackingURL(rmUrl, app.getId()));
+                appAPIEntity.setAppName(app.getName());
+                appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage());
+                appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage());
+                appAPIEntity.setElapsedTime(app.getElapsedTime());
+                appAPIEntity.setStartedTime(app.getStartedTime());
+                appAPIEntity.setState(app.getState());
+                appAPIEntity.setTimestamp(app.getStartedTime());
+                acceptedApps.add(appAPIEntity);
+            } else {
+                Map<String, String> tags = new HashMap<>();
+                tags.put(HadoopClusterConstants.TAG_USER, app.getUser());
+                tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue());
+                for (AggLevel level : AggLevel.values()) {
+                    Map<String, String> newTags = buildMetricTags(level, tags);
+                    for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
+                        Method method = App.class.getMethod(entry.getValue());
+                        Integer value = (Integer) method.invoke(app);
+                        String metricName = String.format(entry.getKey(), level.name);
+                        createMetric(metricName, newTags, timestamp, value);
+                    }
                 }
             }
         }
     }
 
+    private Map<String, String> buildAppTags(App app) {
+        Map<String, String> tags = new HashMap<>();
+        tags.put(AppStreamInfo.SITE, this.site);
+        tags.put(AppStreamInfo.ID, app.getId());
+        tags.put(AppStreamInfo.QUEUE, app.getQueue());
+        tags.put(AppStreamInfo.USER, app.getUser());
+        return tags;
+    }
+
     private enum AggLevel {
         CLUSTER(HadoopClusterConstants.TAG_CLUSTER, ""),
         QUEUE(HadoopClusterConstants.TAG_QUEUE, HadoopClusterConstants.TAG_QUEUE),

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
index 3ffd371..39eec80 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
@@ -36,9 +36,10 @@ public class RunningAppsCrawler implements Runnable {
     private String urlString;
 
     public RunningAppsCrawler(String site, String baseUrl, SpoutOutputCollector collector) {
-        this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl);
+        this.urlString = YarnClusterResourceURLBuilder.buildAcceptedAndRunningAppsURL(baseUrl);
+        //this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl);
         //this.urlString = YarnClusterResourceURLBuilder.buildFinishedAppsURL(baseUrl);
-        listener = new RunningAppParseListener(site, collector);
+        listener = new RunningAppParseListener(site, collector, baseUrl);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index 67cc5c9..165bdb1 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -21,6 +21,8 @@ package org.apache.eagle.hadoop.queue.crawler;
 import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
 import org.apache.eagle.hadoop.queue.model.scheduler.*;
 import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
@@ -68,12 +70,12 @@ public class SchedulerInfoParseListener {
         LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size());
         HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
         List<GenericMetricEntity> metrics = new ArrayList<>(metricEntities);
-        collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+        collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.METRIC, metrics), messageId);
 
         LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size());
-        messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
+        messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.SCHEDULER, System.currentTimeMillis());
         List<TaggedLogAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
-        collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId);
+        collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.ENTITY, entities), messageId);
 
         runningQueueAPIEntities.clear();
         metricEntities.clear();

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
index 40d6e53..800bd03 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -17,6 +17,7 @@
  */
 package org.apache.eagle.hadoop.queue.model;
 
+import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.log.entity.repo.EntityRepository;
@@ -25,5 +26,6 @@ public class HadoopQueueEntityRepository extends EntityRepository {
     public HadoopQueueEntityRepository() {
         this.registerEntity(RunningQueueAPIEntity.class);
         this.registerEntity(QueueStructureAPIEntity.class);
+        this.registerEntity(YarnAppAPIEntity.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
index b1cbb42..393ede3 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
@@ -51,6 +51,9 @@ public class App {
     private int allocatedMB;
     private int allocatedVCores;
     private int runningContainers;
+    // for HDP 2.7
+    private double queueUsagePercentage;
+    private double clusterUsagePercentage;
 
     public String getId() {
         return id;
@@ -219,4 +222,20 @@ public class App {
     public void setRunningContainers(int runningContainers) {
         this.runningContainers = runningContainers;
     }
+
+    public double getQueueUsagePercentage() {
+        return queueUsagePercentage;
+    }
+
+    public void setQueueUsagePercentage(double queueUsagePercentage) {
+        this.queueUsagePercentage = queueUsagePercentage;
+    }
+
+    public double getClusterUsagePercentage() {
+        return clusterUsagePercentage;
+    }
+
+    public void setClusterUsagePercentage(double clusterUsagePercentage) {
+        this.clusterUsagePercentage = clusterUsagePercentage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
new file mode 100644
index 0000000..7e72023
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
@@ -0,0 +1,53 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AppStreamInfo {
+    public static final String SITE = "site";
+    public static final String ID = "id";
+    public static final String USER = "user";
+    public static final String QUEUE = "queue";
+    private static final String NAME = "appName";
+    private static final String STATE = "state";
+    private static final String STARTEDTIME = "startTime";
+    private static final String ELAPSEDTIME = "elapsedTime";
+    private static final String QUEUE_USAGE_PERCENTAGE = "queueUsagePercentage";
+    private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage";
+    private static final String TRACKING_URL = "trackingUrl";
+
+    public static Map<String, Object> convertAppToStream(YarnAppAPIEntity appAPIEntity) {
+        Map<String, Object> queueStreamInfo = new HashMap<>();
+        queueStreamInfo.put(SITE, appAPIEntity.getTags().get(SITE));
+        queueStreamInfo.put(ID, appAPIEntity.getTags().get(ID));
+        queueStreamInfo.put(USER, appAPIEntity.getTags().get(USER));
+        queueStreamInfo.put(QUEUE, appAPIEntity.getTags().get(QUEUE));
+        queueStreamInfo.put(NAME, appAPIEntity.getAppName());
+        queueStreamInfo.put(STATE, appAPIEntity.getState());
+        queueStreamInfo.put(ELAPSEDTIME, appAPIEntity.getElapsedTime());
+        queueStreamInfo.put(STARTEDTIME, appAPIEntity.getStartedTime());
+        queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, appAPIEntity.getQueueUsagePercentage());
+        queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, appAPIEntity.getClusterUsagePercentage());
+        queueStreamInfo.put(TRACKING_URL, appAPIEntity.getTrackingUrl());
+
+        return queueStreamInfo;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
new file mode 100644
index 0000000..7b36523
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
@@ -0,0 +1,111 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("yarn_app")
+@ColumnFamily("f")
+@Prefix("accepted")
+@Service(HadoopClusterConstants.ACCEPTED_APP_SERVICE_NAME)
+@TimeSeries(true)
+@Partition( {"site"})
+@Tags({"site","id","user","queue"})
+public class YarnAppAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String appName;
+    @Column("b")
+    private String state;
+    @Column("c")
+    private long startedTime;
+    @Column("d")
+    private long elapsedTime;
+    @Column("e")
+    private String trackingUrl;
+    @Column("f")
+    private double queueUsagePercentage;
+    @Column("g")
+    private double clusterUsagePercentage;
+
+    public String getAppName() {
+        return appName;
+    }
+
+    public void setAppName(String appName) {
+        this.appName = appName;
+        valueChanged("appName");
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+        valueChanged("state");
+    }
+
+    public long getStartedTime() {
+        return startedTime;
+    }
+
+    public void setStartedTime(long startedTime) {
+        this.startedTime = startedTime;
+        valueChanged("startedTime");
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+        valueChanged("elapsedTime");
+    }
+
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+        valueChanged("trackingUrl");
+    }
+
+    public double getQueueUsagePercentage() {
+        return queueUsagePercentage;
+    }
+
+    public void setQueueUsagePercentage(double queueUsagePercentage) {
+        this.queueUsagePercentage = queueUsagePercentage;
+        valueChanged("queueUsagePercentage");
+    }
+
+    public double getClusterUsagePercentage() {
+        return clusterUsagePercentage;
+    }
+
+    public void setClusterUsagePercentage(double clusterUsagePercentage) {
+        this.clusterUsagePercentage = clusterUsagePercentage;
+        valueChanged("clusterUsagePercentage");
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java
new file mode 100644
index 0000000..af06b27
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueueStreamInfo {
+    private static final String TIMESTAMP = "timestamp";
+    private static final String QUEUE_SITE = "site";
+    public static final String QUEUE_NAME = "queue";
+    private static final String QUEUE_STATE = "state";
+    private static final String QUEUE_SCHEDULER = "scheduler";
+    private static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity";
+    private static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity";
+    private static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity";
+    private static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity";
+    private static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity";
+    private static final String QUEUE_USED_MEMORY = "memory";
+    private static final String QUEUE_USED_VCORES = "vcores";
+    private static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications";
+    private static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications";
+    private static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications";
+
+
+    public static Map<String, Object> convertEntityToStream(RunningQueueAPIEntity queueAPIEntity) {
+        Map<String, Object> queueInfoMap = new HashMap<>();
+        queueInfoMap.put(QueueStreamInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE));
+        queueInfoMap.put(QueueStreamInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE));
+        queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_STATE, queueAPIEntity.getState());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory());
+        queueInfoMap.put(QueueStreamInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores());
+        queueInfoMap.put(QueueStreamInfo.TIMESTAMP, queueAPIEntity.getTimestamp());
+
+        double maxUserUsedCapacity = 0;
+        double userUsedCapacity;
+        for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) {
+            userUsedCapacity = calculateUserUsedCapacity(
+                    queueAPIEntity.getAbsoluteUsedCapacity(),
+                    queueAPIEntity.getMemory(),
+                    user.getMemory());
+            if (userUsedCapacity > maxUserUsedCapacity) {
+                maxUserUsedCapacity = userUsedCapacity;
+            }
+
+        }
+        queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity);
+        queueInfoMap.put(QueueStreamInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity());
+        return queueInfoMap;
+    }
+
+    private static double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) {
+        return userUsedMem * absoluteUsedCapacity / queueUsedMem;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 9eb7008..43a62b7 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -27,11 +27,13 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
-import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
+import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo;
+import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStreamInfo;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
-import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -39,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -47,18 +48,25 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
 
+    private Map<HadoopClusterConstants.DataSource, String> streamMap;
     private HadoopQueueRunningAppConfig config;
     private IEagleServiceClient client;
     private OutputCollector collector;
 
-    public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) {
+    public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config,
+                                        Map<HadoopClusterConstants.DataSource, String> streamMap) {
         this.config = config;
+        this.streamMap = streamMap;
     }
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         HadoopQueueRunningAppConfig.EagleProps.EagleService eagleService = config.eagleProps.eagleService;
-        this.client = new EagleServiceClientImpl(eagleService.host, eagleService.port, eagleService.username, eagleService.password);
+        this.client = new EagleServiceClientImpl(
+                eagleService.host,
+                eagleService.port,
+                eagleService.username,
+                eagleService.password);
         this.collector = collector;
     }
 
@@ -67,30 +75,44 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
         if (input == null) {
             return;
         }
-        String dataType = input.getStringByField(HadoopClusterConstants.FIELD_DATATYPE);
+        DataSource dataSource = (DataSource) input.getValueByField(HadoopClusterConstants.FIELD_DATASOURCE);
+        DataType dataType = (DataType) input.getValueByField(HadoopClusterConstants.FIELD_DATATYPE);
         Object data = input.getValueByField(HadoopClusterConstants.FIELD_DATA);
-        if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.METRIC.toString())) {
-            List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data;
-            writeMetrics(metrics);
-        } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
-            List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data;
+
+        List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data;
+        if (dataType.equals(DataType.METRIC)) {
+            writeEntities(entities, dataType, dataSource);
+        } else {
             for (TaggedLogAPIEntity entity : entities) {
                 if (entity instanceof RunningQueueAPIEntity) {
                     RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity;
                     if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
-                        collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE),
-                                parseLeafQueueInfo(queue)));
+                        String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE);
+                        collector.emit(streamMap.get(dataSource),
+                                new Values(queueName, QueueStreamInfo.convertEntityToStream(queue)));
                     }
+                } else if (entity instanceof YarnAppAPIEntity) {
+                    YarnAppAPIEntity appAPIEntity = (YarnAppAPIEntity) entity;
+                    collector.emit(streamMap.get(dataSource),
+                            new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity)));
                 }
             }
-            writeEntities(entities);
+            if (!dataSource.equals(DataSource.RUNNING_APPS)) {
+                writeEntities(entities, dataType, dataSource);
+            }
         }
         this.collector.ack(input);
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message"));
+        if (streamMap != null) {
+            for (String stormStreamId : streamMap.values()) {
+                declarer.declareStream(stormStreamId, new Fields("f1", "message"));
+            }
+        } else {
+            declarer.declare(new Fields("f1", "message"));
+        }
     }
 
     @Override
@@ -104,67 +126,17 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
         }
     }
 
-    private void writeEntities(List<TaggedLogAPIEntity> entities) {
+    private void writeEntities(List<TaggedLogAPIEntity> entities, DataType dataType, DataSource dataSource) {
         try {
             GenericServiceAPIResponseEntity response = client.create(entities);
             if (!response.isSuccess()) {
                 LOG.error("Got exception from eagle service: " + response.getException());
             } else {
-                LOG.info("Successfully wrote " + entities.size() + " RunningQueueAPIEntity entities");
+                LOG.info("Successfully wrote {} items of {} for {}", entities.size(), dataType, dataSource);
             }
         } catch (Exception e) {
-            LOG.error("cannot create running queue entities successfully", e);
+            LOG.error("cannot create {} entities", entities.size(), e);
         }
         entities.clear();
     }
-
-    private void writeMetrics(List<GenericMetricEntity> entities) {
-        try {
-            GenericServiceAPIResponseEntity response = client.create(entities);
-            if (response.isSuccess()) {
-                LOG.info("Successfully wrote " + entities.size() + " GenericMetricEntity entities");
-            } else {
-                LOG.error(response.getException());
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    private Map<String, Object> parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) {
-        Map<String, Object> queueInfoMap = new HashMap<>();
-        queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE));
-        queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE));
-        queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory());
-        queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores());
-        queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp());
-
-        double maxUserUsedCapacity = 0;
-        double userUsedCapacity;
-        for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) {
-            userUsedCapacity = calculateUserUsedCapacity(
-                    queueAPIEntity.getAbsoluteUsedCapacity(),
-                    queueAPIEntity.getMemory(),
-                    user.getMemory());
-            if (userUsedCapacity > maxUserUsedCapacity) {
-                maxUserUsedCapacity = userUsedCapacity;
-            }
-
-        }
-        queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity);
-        queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity());
-        return queueInfoMap;
-    }
-
-    private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) {
-        return userUsedMem * absoluteUsedCapacity / queueUsedMem;
-    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
index 530be9a..681f25e 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
@@ -51,7 +51,9 @@ public class HadoopQueueRunningSpout extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATATYPE, HadoopClusterConstants.FIELD_DATA));
+        declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATASOURCE,
+                HadoopClusterConstants.FIELD_DATATYPE,
+                HadoopClusterConstants.FIELD_DATA));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 5fb041d..da22836 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -55,12 +55,18 @@
 
         <!-- sink to kafka -->
         <property>
-            <name>dataSinkConfig.topic</name>
-            <displayName>dataSinkConfig.topic</displayName>
+            <name>dataSinkConfig.HADOOP_QUEUE_STREAM.topic</name>
+            <displayName>Destination(Kafka Topic) Of Queue Stream Data</displayName>
             <value>yarn_queue</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>
+            <name>dataSinkConfig.ACCEPTED_APP_STREAM.topic</name>
+            <displayName>Destination(Kafka Topic) Of App Stream Data</displayName>
+            <value>yarn_accepted_app</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
             <name>dataSinkConfig.brokerList</name>
             <displayName>dataSinkConfig.brokerList</displayName>
             <value>localhost:6667</value>
@@ -106,7 +112,7 @@
     </configuration>
     <streams>
         <stream>
-            <streamId>HADOOP_LEAF_QUEUE_STREAM</streamId>
+            <streamId>HADOOP_QUEUE_STREAM</streamId>
             <description>Hadoop Leaf Queue Info Stream</description>
             <validate>true</validate>
             <columns>
@@ -172,6 +178,57 @@
                 </column>
             </columns>
         </stream>
+        <stream>
+            <streamId>ACCEPTED_APP_STREAM</streamId>
+            <description>Accepted App Info Stream</description>
+            <validate>true</validate>
+            <columns>
+                <column>
+                    <name>id</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>appName</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>queue</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>state</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>user</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>trackingUrl</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>elapsedTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>startedTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>queueUsagePercentage</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>clusterUsagePercentage</name>
+                    <type>double</type>
+                </column>
+            </columns>
+        </stream>
     </streams>
   <docs>
     <install>

http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
index d25d05b..2a99d26 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
@@ -91,7 +91,7 @@ public abstract class AbstractURLSelector implements HAURLSelector {
                             LOG.info("Successfully switch to new url : " + selectedUrl);
                             return;
                         }
-                        LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. ");
+                        LOG.info("try url " + urlToCheck + " failed for " + (time + 1) + " times, sleep 5 seconds before try again. ");
                         try {
                             Thread.sleep(5 * 1000);
                         } catch (InterruptedException ex) {