You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/02/21 06:50:44 UTC
eagle git commit: [EAGLE-915] Fetch accepted MR jobs to assist queue
analysis
Repository: eagle
Updated Branches:
refs/heads/master a27289fd3 -> 4a5c4a43c
[EAGLE-915] Fetch accepted MR jobs to assist queue analysis
https://issues.apache.org/jira/browse/EAGLE-915
Author: Zhao, Qingwen <qi...@apache.org>
Closes #826 from qingwen220/EAGLE-915.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/4a5c4a43
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/4a5c4a43
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/4a5c4a43
Branch: refs/heads/master
Commit: 4a5c4a43c1c38bea07867306028e5f3a4b856552
Parents: a27289f
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Feb 21 14:50:33 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Feb 21 14:50:33 2017 +0800
----------------------------------------------------------------------
.../hadoop/queue/HadoopQueueRunningApp.java | 29 +++--
.../queue/common/HadoopClusterConstants.java | 24 +---
.../common/YarnClusterResourceURLBuilder.java | 14 ++-
.../crawler/ClusterMetricsParseListener.java | 6 +-
.../queue/crawler/RunningAppParseListener.java | 66 ++++++++---
.../queue/crawler/RunningAppsCrawler.java | 5 +-
.../crawler/SchedulerInfoParseListener.java | 8 +-
.../model/HadoopQueueEntityRepository.java | 2 +
.../hadoop/queue/model/applications/App.java | 19 ++++
.../queue/model/applications/AppStreamInfo.java | 53 +++++++++
.../model/applications/YarnAppAPIEntity.java | 111 +++++++++++++++++++
.../queue/model/scheduler/QueueStreamInfo.java | 79 +++++++++++++
.../storm/HadoopQueueMetricPersistBolt.java | 110 +++++++-----------
.../queue/storm/HadoopQueueRunningSpout.java | 4 +-
...doop.queue.HadoopQueueRunningAppProvider.xml | 63 ++++++++++-
.../resourcefetch/ha/AbstractURLSelector.java | 2 +-
16 files changed, 472 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
index 68ca8c7..4708baa 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -17,37 +17,52 @@
package org.apache.eagle.hadoop.queue;
import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.messaging.StormStreamSink;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout;
+import java.util.HashMap;
+import java.util.Map;
+
public class HadoopQueueRunningApp extends StormApplication {
public StormTopology execute(Config config, StormEnvironment environment) {
HadoopQueueRunningAppConfig appConfig = new HadoopQueueRunningAppConfig(config);
+ String spoutName = "runningQueueSpout";
+ String persistBoltName = "persistBolt";
+
IRichSpout spout = new HadoopQueueRunningSpout(appConfig);
- HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig);
- TopologyBuilder builder = new TopologyBuilder();
+ Map<HadoopClusterConstants.DataSource, String> streamMaps = new HashMap<>();
+
+ String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString();
+ String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString();
+ streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId);
+ streamMaps.put(DataSource.SCHEDULER, schedulerStreamId);
int numOfPersistTasks = appConfig.topology.numPersistTasks;
int numOfSinkTasks = appConfig.topology.numSinkTasks;
int numOfSpoutTasks = 1;
- String spoutName = "runningQueueSpout";
- String persistBoltName = "persistBolt";
+ HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps);
+ TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName);
- StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config);
+ StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_QUEUE_STREAM", config);
builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks)
- .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName);
+ .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, schedulerStreamId);
+
+ StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config);
+ builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks)
+ .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId);
return builder.createTopology();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 1d64f87..159da21 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -31,6 +31,10 @@ public class HadoopClusterConstants {
CLUSTER_METRIC, RUNNING_APPS, SCHEDULER
}
+ public enum AppState {
+ RUNNING, ACCEPTED
+ }
+
public static class MetricName {
// Metrics from running apps
@@ -61,26 +65,9 @@ public class HadoopClusterConstants {
}
- public static class LeafQueueInfo {
- public static final String TIMESTAMP = "timestamp";
- public static final String QUEUE_SITE = "site";
- public static final String QUEUE_NAME = "queue";
- public static final String QUEUE_STATE = "state";
- public static final String QUEUE_SCHEDULER = "scheduler";
- public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity";
- public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity";
- public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity";
- public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity";
- public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity";
- public static final String QUEUE_USED_MEMORY = "memory";
- public static final String QUEUE_USED_VCORES = "vcores";
- public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications";
- public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications";
- public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications";
- }
-
public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService";
+ public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService";
// tag constants
public static final String TAG_PARENT_QUEUE = "parentQueue";
@@ -90,6 +77,7 @@ public class HadoopClusterConstants {
public static final String TAG_CLUSTER = "cluster";
// field constants
+ public static final String FIELD_DATASOURCE = "dataSource";
public static final String FIELD_DATATYPE = "dataType";
public static final String FIELD_DATA = "data";
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
index 0ee4318..7ec24df 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java
@@ -36,7 +36,19 @@ public class YarnClusterResourceURLBuilder {
}
public static String buildRunningAppsURL(String urlBase) {
- return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING" + "&" + ANONYMOUS_PARAMETER);
+ return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING&" + ANONYMOUS_PARAMETER);
+ }
+
+ public static String buildAcceptedAndRunningAppsURL(String urlBase) {
+ return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED,RUNNING&" + ANONYMOUS_PARAMETER);
+ }
+
+ public static String buildAcceptedAppsURL(String urlBase) {
+ return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED&" + ANONYMOUS_PARAMETER);
+ }
+
+ public static String buildAcceptedAppTrackingURL(String urlBase, String appId) {
+ return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "/" + appId);
}
public static String buildFinishedAppsURL(String urlBase) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
index d3219ef..57dd454 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java
@@ -70,9 +70,7 @@ public class ClusterMetricsParseListener {
entity.setValue(new double[] {0.0});
clusterMetricEntities.put(key, entity);
}
- if (clusterMetricCounts.get(key) == null) {
- clusterMetricCounts.put(key, 0);
- }
+ clusterMetricCounts.putIfAbsent(key, 0);
updateEntityAggValue(entity, aggFunc, value, clusterMetricCounts.get(key));
clusterMetricCounts.put(key, clusterMetricCounts.get(key) + 1);
}
@@ -89,7 +87,7 @@ public class ClusterMetricsParseListener {
public void flush() {
HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis());
List<GenericMetricEntity> metrics = new ArrayList<>(clusterMetricEntities.values());
- this.collector.emit(new ValuesArray(DataType.METRIC.name(), metrics), messageId);
+ this.collector.emit(new ValuesArray(DataSource.CLUSTER_METRIC, DataType.METRIC, metrics), messageId);
reset();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
index 364a1a7..ff54ca3 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
@@ -25,8 +25,13 @@ import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.dataproc.impl.storm.ValuesArray;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
+import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
import org.apache.eagle.hadoop.queue.model.applications.App;
+import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo;
import org.apache.eagle.hadoop.queue.model.applications.Apps;
+import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
import org.apache.eagle.log.entity.GenericMetricEntity;
import backtype.storm.spout.SpoutOutputCollector;
@@ -54,19 +59,29 @@ public class RunningAppParseListener {
};
private String site;
+ private String rmUrl;
private SpoutOutputCollector collector;
private Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>();
+ private List<YarnAppAPIEntity> acceptedApps = new ArrayList<>();
- public RunningAppParseListener(String site, SpoutOutputCollector collector) {
+ public RunningAppParseListener(String site, SpoutOutputCollector collector, String rmUrl) {
this.site = site;
+ this.rmUrl = rmUrl;
this.collector = collector;
}
public void flush() {
- logger.info("start sending app metrics, size: " + appMetricEntities.size());
- HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.RUNNING_APPS, System.currentTimeMillis());
+ logger.info("crawled {} running app metrics", appMetricEntities.size());
+ HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.RUNNING_APPS, System.currentTimeMillis());
List<GenericMetricEntity> metrics = new ArrayList<>(appMetricEntities.values());
- collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+ collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC, metrics), messageId);
+
+ logger.info("crawled {} accepted apps", acceptedApps.size());
+ messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.RUNNING_APPS, System.currentTimeMillis());
+ List<YarnAppAPIEntity> entities = new ArrayList<>(acceptedApps);
+ collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.ENTITY, entities), messageId);
+
+ acceptedApps.clear();
appMetricEntities.clear();
}
@@ -97,21 +112,44 @@ public class RunningAppParseListener {
public void onMetric(Apps apps, long timestamp) throws Exception {
timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
for (App app : apps.getApp()) {
- Map<String, String> tags = new HashMap<>();
- tags.put(HadoopClusterConstants.TAG_USER, app.getUser());
- tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue());
- for (AggLevel level : AggLevel.values()) {
- Map<String, String> newTags = buildMetricTags(level, tags);
- for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
- Method method = App.class.getMethod(entry.getValue());
- Integer value = (Integer) method.invoke(app);
- String metricName = String.format(entry.getKey(), level.name);
- createMetric(metricName, newTags, timestamp, value);
+ if (app.getState().equalsIgnoreCase(HadoopClusterConstants.AppState.ACCEPTED.toString())) {
+ YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity();
+ appAPIEntity.setTags(buildAppTags(app));
+ appAPIEntity.setTrackingUrl(YarnClusterResourceURLBuilder.buildAcceptedAppTrackingURL(rmUrl, app.getId()));
+ appAPIEntity.setAppName(app.getName());
+ appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage());
+ appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage());
+ appAPIEntity.setElapsedTime(app.getElapsedTime());
+ appAPIEntity.setStartedTime(app.getStartedTime());
+ appAPIEntity.setState(app.getState());
+ appAPIEntity.setTimestamp(app.getStartedTime());
+ acceptedApps.add(appAPIEntity);
+ } else {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(HadoopClusterConstants.TAG_USER, app.getUser());
+ tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue());
+ for (AggLevel level : AggLevel.values()) {
+ Map<String, String> newTags = buildMetricTags(level, tags);
+ for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
+ Method method = App.class.getMethod(entry.getValue());
+ Integer value = (Integer) method.invoke(app);
+ String metricName = String.format(entry.getKey(), level.name);
+ createMetric(metricName, newTags, timestamp, value);
+ }
}
}
}
}
+ private Map<String, String> buildAppTags(App app) {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(AppStreamInfo.SITE, this.site);
+ tags.put(AppStreamInfo.ID, app.getId());
+ tags.put(AppStreamInfo.QUEUE, app.getQueue());
+ tags.put(AppStreamInfo.USER, app.getUser());
+ return tags;
+ }
+
private enum AggLevel {
CLUSTER(HadoopClusterConstants.TAG_CLUSTER, ""),
QUEUE(HadoopClusterConstants.TAG_QUEUE, HadoopClusterConstants.TAG_QUEUE),
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
index 3ffd371..39eec80 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
@@ -36,9 +36,10 @@ public class RunningAppsCrawler implements Runnable {
private String urlString;
public RunningAppsCrawler(String site, String baseUrl, SpoutOutputCollector collector) {
- this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl);
+ this.urlString = YarnClusterResourceURLBuilder.buildAcceptedAndRunningAppsURL(baseUrl);
+ //this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl);
//this.urlString = YarnClusterResourceURLBuilder.buildFinishedAppsURL(baseUrl);
- listener = new RunningAppParseListener(site, collector);
+ listener = new RunningAppParseListener(site, collector, baseUrl);
}
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index 67cc5c9..165bdb1 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -21,6 +21,8 @@ package org.apache.eagle.hadoop.queue.crawler;
import org.apache.eagle.dataproc.impl.storm.ValuesArray;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
import org.apache.eagle.hadoop.queue.model.scheduler.*;
import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
@@ -68,12 +70,12 @@ public class SchedulerInfoParseListener {
LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size());
HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
List<GenericMetricEntity> metrics = new ArrayList<>(metricEntities);
- collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId);
+ collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.METRIC, metrics), messageId);
LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size());
- messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
+ messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.SCHEDULER, System.currentTimeMillis());
List<TaggedLogAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
- collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId);
+ collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.ENTITY, entities), messageId);
runningQueueAPIEntities.clear();
metricEntities.clear();
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
index 40d6e53..800bd03 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -17,6 +17,7 @@
*/
package org.apache.eagle.hadoop.queue.model;
+import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
import org.apache.eagle.log.entity.repo.EntityRepository;
@@ -25,5 +26,6 @@ public class HadoopQueueEntityRepository extends EntityRepository {
public HadoopQueueEntityRepository() {
this.registerEntity(RunningQueueAPIEntity.class);
this.registerEntity(QueueStructureAPIEntity.class);
+ this.registerEntity(YarnAppAPIEntity.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
index b1cbb42..393ede3 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
@@ -51,6 +51,9 @@ public class App {
private int allocatedMB;
private int allocatedVCores;
private int runningContainers;
+ // for HDP 2.7
+ private double queueUsagePercentage;
+ private double clusterUsagePercentage;
public String getId() {
return id;
@@ -219,4 +222,20 @@ public class App {
public void setRunningContainers(int runningContainers) {
this.runningContainers = runningContainers;
}
+
+ public double getQueueUsagePercentage() {
+ return queueUsagePercentage;
+ }
+
+ public void setQueueUsagePercentage(double queueUsagePercentage) {
+ this.queueUsagePercentage = queueUsagePercentage;
+ }
+
+ public double getClusterUsagePercentage() {
+ return clusterUsagePercentage;
+ }
+
+ public void setClusterUsagePercentage(double clusterUsagePercentage) {
+ this.clusterUsagePercentage = clusterUsagePercentage;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
new file mode 100644
index 0000000..7e72023
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AppStreamInfo {
+ public static final String SITE = "site";
+ public static final String ID = "id";
+ public static final String USER = "user";
+ public static final String QUEUE = "queue";
+ private static final String NAME = "appName";
+ private static final String STATE = "state";
+ private static final String STARTEDTIME = "startTime";
+ private static final String ELAPSEDTIME = "elapsedTime";
+ private static final String QUEUE_USAGE_PERCENTAGE = "queueUsagePercentage";
+ private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage";
+ private static final String TRACKING_URL = "trackingUrl";
+
+ public static Map<String, Object> convertAppToStream(YarnAppAPIEntity appAPIEntity) {
+ Map<String, Object> queueStreamInfo = new HashMap<>();
+ queueStreamInfo.put(SITE, appAPIEntity.getTags().get(SITE));
+ queueStreamInfo.put(ID, appAPIEntity.getTags().get(ID));
+ queueStreamInfo.put(USER, appAPIEntity.getTags().get(USER));
+ queueStreamInfo.put(QUEUE, appAPIEntity.getTags().get(QUEUE));
+ queueStreamInfo.put(NAME, appAPIEntity.getAppName());
+ queueStreamInfo.put(STATE, appAPIEntity.getState());
+ queueStreamInfo.put(ELAPSEDTIME, appAPIEntity.getElapsedTime());
+ queueStreamInfo.put(STARTEDTIME, appAPIEntity.getStartedTime());
+ queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, appAPIEntity.getQueueUsagePercentage());
+ queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, appAPIEntity.getClusterUsagePercentage());
+ queueStreamInfo.put(TRACKING_URL, appAPIEntity.getTrackingUrl());
+
+ return queueStreamInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
new file mode 100644
index 0000000..7b36523
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.applications;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("yarn_app")
+@ColumnFamily("f")
+@Prefix("accepted")
+@Service(HadoopClusterConstants.ACCEPTED_APP_SERVICE_NAME)
+@TimeSeries(true)
+@Partition( {"site"})
+@Tags({"site","id","user","queue"})
+public class YarnAppAPIEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String appName;
+ @Column("b")
+ private String state;
+ @Column("c")
+ private long startedTime;
+ @Column("d")
+ private long elapsedTime;
+ @Column("e")
+ private String trackingUrl;
+ @Column("f")
+ private double queueUsagePercentage;
+ @Column("g")
+ private double clusterUsagePercentage;
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ valueChanged("appName");
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ valueChanged("state");
+ }
+
+ public long getStartedTime() {
+ return startedTime;
+ }
+
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ valueChanged("startedTime");
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ valueChanged("elapsedTime");
+ }
+
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ valueChanged("trackingUrl");
+ }
+
+ public double getQueueUsagePercentage() {
+ return queueUsagePercentage;
+ }
+
+ public void setQueueUsagePercentage(double queueUsagePercentage) {
+ this.queueUsagePercentage = queueUsagePercentage;
+ valueChanged("queueUsagePercentage");
+ }
+
+ public double getClusterUsagePercentage() {
+ return clusterUsagePercentage;
+ }
+
+ public void setClusterUsagePercentage(double clusterUsagePercentage) {
+ this.clusterUsagePercentage = clusterUsagePercentage;
+ valueChanged("clusterUsagePercentage");
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java
new file mode 100644
index 0000000..af06b27
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueueStreamInfo {
+ private static final String TIMESTAMP = "timestamp";
+ private static final String QUEUE_SITE = "site";
+ public static final String QUEUE_NAME = "queue";
+ private static final String QUEUE_STATE = "state";
+ private static final String QUEUE_SCHEDULER = "scheduler";
+ private static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity";
+ private static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity";
+ private static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity";
+ private static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity";
+ private static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity";
+ private static final String QUEUE_USED_MEMORY = "memory";
+ private static final String QUEUE_USED_VCORES = "vcores";
+ private static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications";
+ private static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications";
+ private static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications";
+
+
+ public static Map<String, Object> convertEntityToStream(RunningQueueAPIEntity queueAPIEntity) {
+ Map<String, Object> queueInfoMap = new HashMap<>();
+ queueInfoMap.put(QueueStreamInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE));
+ queueInfoMap.put(QueueStreamInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE));
+ queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_STATE, queueAPIEntity.getState());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory());
+ queueInfoMap.put(QueueStreamInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores());
+ queueInfoMap.put(QueueStreamInfo.TIMESTAMP, queueAPIEntity.getTimestamp());
+
+ double maxUserUsedCapacity = 0;
+ double userUsedCapacity;
+ for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) {
+ userUsedCapacity = calculateUserUsedCapacity(
+ queueAPIEntity.getAbsoluteUsedCapacity(),
+ queueAPIEntity.getMemory(),
+ user.getMemory());
+ if (userUsedCapacity > maxUserUsedCapacity) {
+ maxUserUsedCapacity = userUsedCapacity;
+ }
+
+ }
+ queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity);
+ queueInfoMap.put(QueueStreamInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity());
+ return queueInfoMap;
+ }
+
+ private static double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) {
+ return userUsedMem * absoluteUsedCapacity / queueUsedMem;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 9eb7008..43a62b7 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -27,11 +27,13 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
-import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType;
+import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo;
+import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStreamInfo;
import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
-import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -39,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,18 +48,25 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
+ private Map<HadoopClusterConstants.DataSource, String> streamMap;
private HadoopQueueRunningAppConfig config;
private IEagleServiceClient client;
private OutputCollector collector;
- public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) {
+ public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config,
+ Map<HadoopClusterConstants.DataSource, String> streamMap) {
this.config = config;
+ this.streamMap = streamMap;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
HadoopQueueRunningAppConfig.EagleProps.EagleService eagleService = config.eagleProps.eagleService;
- this.client = new EagleServiceClientImpl(eagleService.host, eagleService.port, eagleService.username, eagleService.password);
+ this.client = new EagleServiceClientImpl(
+ eagleService.host,
+ eagleService.port,
+ eagleService.username,
+ eagleService.password);
this.collector = collector;
}
@@ -67,30 +75,44 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
if (input == null) {
return;
}
- String dataType = input.getStringByField(HadoopClusterConstants.FIELD_DATATYPE);
+ DataSource dataSource = (DataSource) input.getValueByField(HadoopClusterConstants.FIELD_DATASOURCE);
+ DataType dataType = (DataType) input.getValueByField(HadoopClusterConstants.FIELD_DATATYPE);
Object data = input.getValueByField(HadoopClusterConstants.FIELD_DATA);
- if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.METRIC.toString())) {
- List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data;
- writeMetrics(metrics);
- } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
- List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data;
+
+ List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data;
+ if (dataType.equals(DataType.METRIC)) {
+ writeEntities(entities, dataType, dataSource);
+ } else {
for (TaggedLogAPIEntity entity : entities) {
if (entity instanceof RunningQueueAPIEntity) {
RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity;
if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
- collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE),
- parseLeafQueueInfo(queue)));
+ String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE);
+ collector.emit(streamMap.get(dataSource),
+ new Values(queueName, QueueStreamInfo.convertEntityToStream(queue)));
}
+ } else if (entity instanceof YarnAppAPIEntity) {
+ YarnAppAPIEntity appAPIEntity = (YarnAppAPIEntity) entity;
+ collector.emit(streamMap.get(dataSource),
+ new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity)));
}
}
- writeEntities(entities);
+ if (!dataSource.equals(DataSource.RUNNING_APPS)) {
+ writeEntities(entities, dataType, dataSource);
+ }
}
this.collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message"));
+ if (streamMap != null) {
+ for (String stormStreamId : streamMap.values()) {
+ declarer.declareStream(stormStreamId, new Fields("f1", "message"));
+ }
+ } else {
+ declarer.declare(new Fields("f1", "message"));
+ }
}
@Override
@@ -104,67 +126,17 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
}
}
- private void writeEntities(List<TaggedLogAPIEntity> entities) {
+ private void writeEntities(List<TaggedLogAPIEntity> entities, DataType dataType, DataSource dataSource) {
try {
GenericServiceAPIResponseEntity response = client.create(entities);
if (!response.isSuccess()) {
LOG.error("Got exception from eagle service: " + response.getException());
} else {
- LOG.info("Successfully wrote " + entities.size() + " RunningQueueAPIEntity entities");
+ LOG.info("Successfully wrote {} items of {} for {}", entities.size(), dataType, dataSource);
}
} catch (Exception e) {
- LOG.error("cannot create running queue entities successfully", e);
+ LOG.error("cannot create {} entities", entities.size(), e);
}
entities.clear();
}
-
- private void writeMetrics(List<GenericMetricEntity> entities) {
- try {
- GenericServiceAPIResponseEntity response = client.create(entities);
- if (response.isSuccess()) {
- LOG.info("Successfully wrote " + entities.size() + " GenericMetricEntity entities");
- } else {
- LOG.error(response.getException());
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- private Map<String, Object> parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) {
- Map<String, Object> queueInfoMap = new HashMap<>();
- queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE));
- queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE));
- queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity());
- queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity());
- queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity());
- queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications());
- queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications());
- queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications());
- queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler());
- queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState());
- queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory());
- queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores());
- queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp());
-
- double maxUserUsedCapacity = 0;
- double userUsedCapacity;
- for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) {
- userUsedCapacity = calculateUserUsedCapacity(
- queueAPIEntity.getAbsoluteUsedCapacity(),
- queueAPIEntity.getMemory(),
- user.getMemory());
- if (userUsedCapacity > maxUserUsedCapacity) {
- maxUserUsedCapacity = userUsedCapacity;
- }
-
- }
- queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity);
- queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity());
- return queueInfoMap;
- }
-
- private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) {
- return userUsedMem * absoluteUsedCapacity / queueUsedMem;
- }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
index 530be9a..681f25e 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
@@ -51,7 +51,9 @@ public class HadoopQueueRunningSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATATYPE, HadoopClusterConstants.FIELD_DATA));
+ declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATASOURCE,
+ HadoopClusterConstants.FIELD_DATATYPE,
+ HadoopClusterConstants.FIELD_DATA));
}
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 5fb041d..da22836 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -55,12 +55,18 @@
<!-- sink to kafka -->
<property>
- <name>dataSinkConfig.topic</name>
- <displayName>dataSinkConfig.topic</displayName>
+ <name>dataSinkConfig.HADOOP_QUEUE_STREAM.topic</name>
+ <displayName>Destination(Kafka Topic) Of Queue Stream Data</displayName>
<value>yarn_queue</value>
<description>topic for kafka data sink</description>
</property>
<property>
+ <name>dataSinkConfig.ACCEPTED_APP_STREAM.topic</name>
+ <displayName>Destination(Kafka Topic) Of App Stream Data</displayName>
+ <value>yarn_accepted_app</value>
+ <description>topic for kafka data sink</description>
+ </property>
+ <property>
<name>dataSinkConfig.brokerList</name>
<displayName>dataSinkConfig.brokerList</displayName>
<value>localhost:6667</value>
@@ -106,7 +112,7 @@
</configuration>
<streams>
<stream>
- <streamId>HADOOP_LEAF_QUEUE_STREAM</streamId>
+ <streamId>HADOOP_QUEUE_STREAM</streamId>
<description>Hadoop Leaf Queue Info Stream</description>
<validate>true</validate>
<columns>
@@ -172,6 +178,57 @@
</column>
</columns>
</stream>
+ <stream>
+ <streamId>ACCEPTED_APP_STREAM</streamId>
+ <description>Accepted App Info Stream</description>
+ <validate>true</validate>
+ <columns>
+ <column>
+ <name>id</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>site</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>appName</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>queue</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>state</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>user</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>trackingUrl</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>elapsedTime</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>startedTime</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>queueUsagePercentage</name>
+ <type>double</type>
+ </column>
+ <column>
+ <name>clusterUsagePercentage</name>
+ <type>double</type>
+ </column>
+ </columns>
+ </stream>
</streams>
<docs>
<install>
http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
index d25d05b..2a99d26 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
@@ -91,7 +91,7 @@ public abstract class AbstractURLSelector implements HAURLSelector {
LOG.info("Successfully switch to new url : " + selectedUrl);
return;
}
- LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. ");
+ LOG.info("try url " + urlToCheck + " failed for " + (time + 1) + " times, sleep 5 seconds before try again. ");
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException ex) {