You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/02/16 07:41:30 UTC
eagle git commit: [EAGLE-889] Add a restful API for Hadoop running
queue to query top N users or jobs
Repository: eagle
Updated Branches:
refs/heads/master 4ed406abc -> 077e27b23
[EAGLE-889] Add a restful API for Hadoop running queue to query top N users or jobs
https://issues.apache.org/jira/browse/EAGLE-889
Sample 1: query the top N users/jobs of memory usage under queue1 on site1 at time 1487156721356
http://localhost:9090/rest/queue/memory?top=10&queue=queue1¤tTime=1487156721356&site=site1
Sample 2: query the current queue hierarchy
http://localhost:9090/rest/entities?query=QueueMappingService[site=%22sandbox%22]{*}&pageSize=10000
Author: Zhao, Qingwen <qi...@apache.org>
Closes #805 from qingwen220/EAGLE-889.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/077e27b2
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/077e27b2
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/077e27b2
Branch: refs/heads/master
Commit: 077e27b236c4b31454a45bb66a7372bfcd01e13a
Parents: 4ed406a
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Feb 16 15:41:20 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Feb 16 15:41:20 2017 +0800
----------------------------------------------------------------------
.../queue/common/HadoopClusterConstants.java | 1 +
.../crawler/SchedulerInfoParseListener.java | 25 ++-
.../model/HadoopQueueEntityRepository.java | 2 +
.../scheduler/QueueStructureAPIEntity.java | 69 +++++++++
.../storm/HadoopQueueMetricPersistBolt.java | 15 +-
...doop.queue.HadoopQueueRunningAppProvider.xml | 2 +-
.../jpm/mr/running/parser/MRJobParser.java | 2 +
eagle-jpm/eagle-jpm-service/pom.xml | 5 +
.../eagle/service/jpm/RunningQueueResource.java | 154 +++++++++++++++++++
.../eagle/service/jpm/RunningQueueResponse.java | 50 ++++++
...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 2 +-
....eagle.topology.TopologyCheckAppProvider.xml | 2 +-
12 files changed, 316 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 9a08f05..1d64f87 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -80,6 +80,7 @@ public class HadoopClusterConstants {
}
public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
+ public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService";
// tag constants
public static final String TAG_PARENT_QUEUE = "parentQueue";
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index b0452c9..67cc5c9 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -22,7 +22,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericMetricEntity;
import backtype.storm.spout.SpoutOutputCollector;
@@ -41,7 +43,7 @@ public class SchedulerInfoParseListener {
//private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
//private int MAX_CACHE_COUNT = 1000;
- private final List<RunningQueueAPIEntity> runningQueueAPIEntities = new ArrayList<>();
+ private final List<TaggedLogAPIEntity> runningQueueAPIEntities = new ArrayList<>();
private final List<GenericMetricEntity> metricEntities = new ArrayList<>();
private String site;
@@ -56,6 +58,7 @@ public class SchedulerInfoParseListener {
Map<String, String> tags = buildMetricTags(null, null);
createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity());
createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity());
+
for (Queue queue : scheduler.getQueues().getQueue()) {
createQueues(queue, currentTimestamp, scheduler, null);
}
@@ -69,7 +72,7 @@ public class SchedulerInfoParseListener {
LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size());
messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
- List<RunningQueueAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
+ List<TaggedLogAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId);
runningQueueAPIEntities.clear();
@@ -97,7 +100,7 @@ public class SchedulerInfoParseListener {
this.metricEntities.add(e);
}
- private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception {
+ private List<String> createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception {
RunningQueueAPIEntity _entity = new RunningQueueAPIEntity();
Map<String, String> _tags = buildMetricTags(queue.getQueueName(), parentQueueName);
_entity.setTags(_tags);
@@ -123,7 +126,6 @@ public class SchedulerInfoParseListener {
UserWrappers users = new UserWrappers();
users.setUsers(userList);
_entity.setUsers(users);
-
runningQueueAPIEntities.add(_entity);
createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, currentTimestamp, queue.getNumPendingApplications());
@@ -145,11 +147,24 @@ public class SchedulerInfoParseListener {
}
}
+ List<String> subQueues = new ArrayList<>();
+ List<String> allSubQueues = new ArrayList<>();
if (queue.getQueues() != null && queue.getQueues().getQueue() != null) {
for (Queue subQueue : queue.getQueues().getQueue()) {
- createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName());
+ subQueues.add(subQueue.getQueueName());
+ allSubQueues.add(subQueue.getQueueName());
+ List<String> queues = createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName());
+ allSubQueues.addAll(queues);
}
}
+ QueueStructureAPIEntity queueStructureAPIEntity = new QueueStructureAPIEntity();
+ queueStructureAPIEntity.setTags(_tags);
+ queueStructureAPIEntity.setSubQueues(subQueues);
+ queueStructureAPIEntity.setAllSubQueues(allSubQueues);
+ queueStructureAPIEntity.setLastUpdateTime(currentTimestamp);
+ runningQueueAPIEntities.add(queueStructureAPIEntity);
+
+ return allSubQueues;
}
private UserWrapper wrapUser(User user) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
index f598779..40d6e53 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -17,11 +17,13 @@
*/
package org.apache.eagle.hadoop.queue.model;
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
import org.apache.eagle.log.entity.repo.EntityRepository;
public class HadoopQueueEntityRepository extends EntityRepository {
public HadoopQueueEntityRepository() {
this.registerEntity(RunningQueueAPIEntity.class);
+ this.registerEntity(QueueStructureAPIEntity.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
new file mode 100644
index 0000000..72f67bc
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.List;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("queue_map")
+@ColumnFamily("f")
+@Prefix("queueMap")
+@Service(HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME)
+@TimeSeries(false)
+@Partition( {"site"})
+public class QueueStructureAPIEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private List<String> subQueues;
+ @Column("b")
+ private List<String> allSubQueues;
+ @Column("c")
+ private long lastUpdateTime;
+
+ public List<String> getSubQueues() {
+ return subQueues;
+ }
+
+ public void setSubQueues(List<String> subQueues) {
+ this.subQueues = subQueues;
+ valueChanged("subQueues");
+ }
+
+ public List<String> getAllSubQueues() {
+ return allSubQueues;
+ }
+
+ public void setAllSubQueues(List<String> allSubQueues) {
+ this.allSubQueues = allSubQueues;
+ valueChanged("allSubQueues");
+ }
+
+ public long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public void setLastUpdateTime(long lastUpdateTime) {
+ this.lastUpdateTime = lastUpdateTime;
+ valueChanged("lastUpdateTime");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 1bafc13..9eb7008 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -30,6 +30,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
@@ -72,10 +73,14 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data;
writeMetrics(metrics);
} else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
- List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data;
- for (RunningQueueAPIEntity queue : entities) {
- if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
- collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue)));
+ List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data;
+ for (TaggedLogAPIEntity entity : entities) {
+ if (entity instanceof RunningQueueAPIEntity) {
+ RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity;
+ if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
+ collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE),
+ parseLeafQueueInfo(queue)));
+ }
}
}
writeEntities(entities);
@@ -99,7 +104,7 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
}
}
- private void writeEntities(List<RunningQueueAPIEntity> entities) {
+ private void writeEntities(List<TaggedLogAPIEntity> entities) {
try {
GenericServiceAPIResponseEntity response = client.create(entities);
if (!response.isSuccess()) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 4cf745c..5fb041d 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -57,7 +57,7 @@
<property>
<name>dataSinkConfig.topic</name>
<displayName>dataSinkConfig.topic</displayName>
- <value>hadoop_leaf_queue</value>
+ <value>yarn_queue</value>
<description>topic for kafka data sink</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 6b33d31..525ffc2 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -135,6 +135,8 @@ public class MRJobParser implements Runnable {
JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId);
jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString());
jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString());
+ // set an estimated job finished time because it's hard the get the specific one
+ jobExecutionAPIEntity.setEndTime(System.currentTimeMillis());
mrJobConfigs.remove(mrJobId);
if (mrJobConfigs.size() == 0) {
this.parserStatus = ParserStatus.APP_FINISHED;
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/pom.xml b/eagle-jpm/eagle-jpm-service/pom.xml
index d6807bd..197740e 100644
--- a/eagle-jpm/eagle-jpm-service/pom.xml
+++ b/eagle-jpm/eagle-jpm-service/pom.xml
@@ -43,5 +43,10 @@
<artifactId>eagle-jpm-entity</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-hadoop-queue</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
new file mode 100644
index 0000000..2632423
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
+import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.*;
+
+import static org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_QUEUE;
+import static org.apache.eagle.jpm.util.MRJobTagName.USER;
+
+@Path("queue")
+public class RunningQueueResource {
+
+ @GET
+ @Path("memory")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site,
+ @QueryParam("queue") String queue,
+ @QueryParam("currentTime") long currentTime,
+ @QueryParam("top") int top) {
+ RunningQueueResponse result = new RunningQueueResponse();
+ try {
+ if (site == null || queue == null || currentTime == 0L || top == 0) {
+ throw new Exception("Invalid query parameters: site == null || queue == null || currentTime == 0L || top == 0");
+ }
+ Tuple2<String, String> queryTimeRange = getQueryTimeRange(currentTime);
+ Set<String> queueSet = getSubQueueSet(site, queue);
+ List<JobExecutionAPIEntity> runningJobs = getRunningJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1());
+ List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> jobs = getJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1());
+ Set<String> jobIds = new HashSet<>();
+ jobs.forEach(job -> jobIds.add(job.getTags().get(JOB_ID.toString())));
+
+ Map<String, Long> userUsage = new HashMap<>();
+ Map<String, Long> jobUsage = new HashMap<>();
+ for (JobExecutionAPIEntity job : runningJobs) {
+ String jobId = job.getTags().get(JOB_ID.toString());
+ String jobQueue = job.getTags().get(JOB_QUEUE.toString());
+ String user = job.getTags().get(USER.toString());
+
+ if (jobIds.contains(jobId) && queueSet.contains(jobQueue)) {
+ if (userUsage.containsKey(user)) {
+ userUsage.put(user, userUsage.get(user) + job.getAllocatedMB());
+ } else {
+ userUsage.put(user, job.getAllocatedMB());
+ }
+ jobUsage.put(jobId, job.getAllocatedMB());
+ }
+ }
+ result.setJobs(getTopRecords(top, jobUsage));
+ result.setUsers(getTopRecords(top, userUsage));
+ } catch (Exception e) {
+ result.setErrMessage(e.getMessage());
+ }
+ return result;
+ }
+
+ private List<JobExecutionAPIEntity> getRunningJobs(String site, long currentTime, String startTime, String endTime) throws Exception {
+ GenericEntityServiceResource resource = new GenericEntityServiceResource();
+ String query = String.format("%s[@site=\"%s\" and @startTime<=%s and (@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, @allocatedMB}",
+ JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime);
+ GenericServiceAPIResponseEntity<JobExecutionAPIEntity> runningJobResponse = resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+ if (!runningJobResponse.isSuccess() || runningJobResponse.getObj() == null) {
+ throw new IOException(runningJobResponse.getException());
+ }
+
+ return runningJobResponse.getObj();
+ }
+
+ private List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> getJobs(String site,
+ long currentTime,
+ String startTime,
+ String endTime) throws Exception {
+ GenericEntityServiceResource resource = new GenericEntityServiceResource();
+ String query = String.format("%s[@site=\"%s\" and @startTime<=%s and @endTime>%s]{@jobId}", JPA_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime);
+
+ GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> response =
+ resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+ if (!response.isSuccess() || response.getObj() == null) {
+ throw new IOException(response.getException());
+ }
+
+ return response.getObj();
+ }
+
+ private Set<String> getSubQueueSet(String site, String parentQueue) throws IOException {
+ GenericEntityServiceResource resource = new GenericEntityServiceResource();
+
+ String query = String.format("%s[@site=\"%s\" and @queue=\"%s\"]{*}", QUEUE_MAPPING_SERVICE_NAME, site, parentQueue);
+ GenericServiceAPIResponseEntity<QueueStructureAPIEntity> responseEntity = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+ if (!responseEntity.isSuccess() || responseEntity.getObj() == null) {
+ throw new IOException(responseEntity.getException());
+ }
+
+ Set<String> subQueues = new HashSet<>();
+ subQueues.add(parentQueue);
+ subQueues.addAll(responseEntity.getObj().get(0).getAllSubQueues());
+
+ return subQueues;
+ }
+
+ private Tuple2<String, String> getQueryTimeRange(long currentTime) throws ParseException {
+ String startTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime - DateTimeUtil.ONEHOUR * 12);
+ String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime + DateTimeUtil.ONEMINUTE);
+ return new Tuple2<>(startTime, endTime);
+ }
+
+ private Map<String, Long> getTopRecords(int top, Map<String, Long> map) {
+ Map<String, Long> newMap = new LinkedHashMap<>();
+
+ List<Map.Entry<String,Long>> list = new ArrayList<>(map.entrySet());
+ Collections.sort(list, (o1, o2) -> o1.getValue() < o2.getValue() ? 1 : -1);
+ for (Map.Entry<String, Long> entry : list) {
+ if (newMap.size() < top) {
+ newMap.put(entry.getKey(), entry.getValue());
+ } else {
+ break;
+ }
+ }
+ return newMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
new file mode 100644
index 0000000..1281b66
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import java.util.Map;
+
+public class RunningQueueResponse {
+ private String errMessage;
+ private Map<String, Long> jobs;
+ private Map<String, Long> users;
+
+ public String getErrMessage() {
+ return errMessage;
+ }
+
+ public void setErrMessage(String errMessage) {
+ this.errMessage = errMessage;
+ }
+
+ public Map<String, Long> getJobs() {
+ return jobs;
+ }
+
+ public void setJobs(Map<String, Long> jobs) {
+ this.jobs = jobs;
+ }
+
+ public Map<String, Long> getUsers() {
+ return users;
+ }
+
+ public void setUsers(Map<String, Long> users) {
+ this.users = users;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 49694a5..1997257 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -115,7 +115,7 @@
<property>
<name>dataSinkConfig.topic</name>
<displayName>Kafka Topic for Parsed Data Sink</displayName>
- <value>hdfs_audit_log_enriched</value>
+ <value>hdfs_audit_event</value>
<description>topic for kafka data sink</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 87d3202..1142e1b 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -133,7 +133,7 @@
<property>
<name>dataSinkConfig.topic</name>
<displayName>Topic For Kafka Data Sink</displayName>
- <value>topology_health_check</value>
+ <value>topology_check</value>
<description>topic For kafka data sink</description>
</property>
<property>