You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/02/16 07:41:30 UTC

eagle git commit: [EAGLE-889] Add a restful API for Hadoop running queue to query top N users or jobs

Repository: eagle
Updated Branches:
  refs/heads/master 4ed406abc -> 077e27b23


[EAGLE-889] Add a restful API for Hadoop running queue to query top N users or jobs

https://issues.apache.org/jira/browse/EAGLE-889

Sample 1: query the top N users/jobs of memory usage under queue1 on site1 at time 1487156721356
http://localhost:9090/rest/queue/memory?top=10&queue=queue1&currentTime=1487156721356&site=site1

Sample 2: query the current queue hierarchy
http://localhost:9090/rest/entities?query=QueueMappingService[site=%22sandbox%22]{*}&pageSize=10000

Author: Zhao, Qingwen <qi...@apache.org>

Closes #805 from qingwen220/EAGLE-889.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/077e27b2
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/077e27b2
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/077e27b2

Branch: refs/heads/master
Commit: 077e27b236c4b31454a45bb66a7372bfcd01e13a
Parents: 4ed406a
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Feb 16 15:41:20 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Feb 16 15:41:20 2017 +0800

----------------------------------------------------------------------
 .../queue/common/HadoopClusterConstants.java    |   1 +
 .../crawler/SchedulerInfoParseListener.java     |  25 ++-
 .../model/HadoopQueueEntityRepository.java      |   2 +
 .../scheduler/QueueStructureAPIEntity.java      |  69 +++++++++
 .../storm/HadoopQueueMetricPersistBolt.java     |  15 +-
 ...doop.queue.HadoopQueueRunningAppProvider.xml |   2 +-
 .../jpm/mr/running/parser/MRJobParser.java      |   2 +
 eagle-jpm/eagle-jpm-service/pom.xml             |   5 +
 .../eagle/service/jpm/RunningQueueResource.java | 154 +++++++++++++++++++
 .../eagle/service/jpm/RunningQueueResponse.java |  50 ++++++
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |   2 +-
 ....eagle.topology.TopologyCheckAppProvider.xml |   2 +-
 12 files changed, 316 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 9a08f05..1d64f87 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -80,6 +80,7 @@ public class HadoopClusterConstants {
     }
 
     public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
+    public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService";
 
     // tag constants
     public static final String TAG_PARENT_QUEUE = "parentQueue";

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
index b0452c9..67cc5c9 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java
@@ -22,7 +22,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName;
 import org.apache.eagle.hadoop.queue.model.scheduler.*;
+import org.apache.eagle.hadoop.queue.model.scheduler.Queue;
 import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 
 import backtype.storm.spout.SpoutOutputCollector;
@@ -41,7 +43,7 @@ public class SchedulerInfoParseListener {
     //private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
     //private int MAX_CACHE_COUNT = 1000;
 
-    private final List<RunningQueueAPIEntity> runningQueueAPIEntities = new ArrayList<>();
+    private final List<TaggedLogAPIEntity> runningQueueAPIEntities = new ArrayList<>();
     private final List<GenericMetricEntity> metricEntities = new ArrayList<>();
 
     private String site;
@@ -56,6 +58,7 @@ public class SchedulerInfoParseListener {
         Map<String, String> tags = buildMetricTags(null, null);
         createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity());
         createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity());
+
         for (Queue queue : scheduler.getQueues().getQueue()) {
             createQueues(queue, currentTimestamp, scheduler, null);
         }
@@ -69,7 +72,7 @@ public class SchedulerInfoParseListener {
 
         LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size());
         messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis());
-        List<RunningQueueAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
+        List<TaggedLogAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities);
         collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId);
 
         runningQueueAPIEntities.clear();
@@ -97,7 +100,7 @@ public class SchedulerInfoParseListener {
         this.metricEntities.add(e);
     }
 
-    private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception {
+    private List<String> createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception {
         RunningQueueAPIEntity _entity = new RunningQueueAPIEntity();
         Map<String, String> _tags = buildMetricTags(queue.getQueueName(), parentQueueName);
         _entity.setTags(_tags);
@@ -123,7 +126,6 @@ public class SchedulerInfoParseListener {
         UserWrappers users = new UserWrappers();
         users.setUsers(userList);
         _entity.setUsers(users);
-
         runningQueueAPIEntities.add(_entity);
 
         createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, currentTimestamp, queue.getNumPendingApplications());
@@ -145,11 +147,24 @@ public class SchedulerInfoParseListener {
             }
         }
 
+        List<String> subQueues = new ArrayList<>();
+        List<String> allSubQueues = new ArrayList<>();
         if (queue.getQueues() != null && queue.getQueues().getQueue() != null) {
             for (Queue subQueue : queue.getQueues().getQueue()) {
-                createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName());
+                subQueues.add(subQueue.getQueueName());
+                allSubQueues.add(subQueue.getQueueName());
+                List<String> queues = createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName());
+                allSubQueues.addAll(queues);
             }
         }
+        QueueStructureAPIEntity queueStructureAPIEntity = new QueueStructureAPIEntity();
+        queueStructureAPIEntity.setTags(_tags);
+        queueStructureAPIEntity.setSubQueues(subQueues);
+        queueStructureAPIEntity.setAllSubQueues(allSubQueues);
+        queueStructureAPIEntity.setLastUpdateTime(currentTimestamp);
+        runningQueueAPIEntities.add(queueStructureAPIEntity);
+
+        return allSubQueues;
     }
 
     private UserWrapper wrapUser(User user) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
index f598779..40d6e53 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -17,11 +17,13 @@
  */
 package org.apache.eagle.hadoop.queue.model;
 
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.log.entity.repo.EntityRepository;
 
 public class HadoopQueueEntityRepository extends EntityRepository {
     public HadoopQueueEntityRepository() {
         this.registerEntity(RunningQueueAPIEntity.class);
+        this.registerEntity(QueueStructureAPIEntity.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
new file mode 100644
index 0000000..72f67bc
--- /dev/null
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.queue.model.scheduler;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.List;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("queue_map")
+@ColumnFamily("f")
+@Prefix("queueMap")
+@Service(HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME)
+@TimeSeries(false)
+@Partition( {"site"})
+public class QueueStructureAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private List<String> subQueues;
+    @Column("b")
+    private List<String> allSubQueues;
+    @Column("c")
+    private long lastUpdateTime;
+
+    public List<String> getSubQueues() {
+        return subQueues;
+    }
+
+    public void setSubQueues(List<String> subQueues) {
+        this.subQueues = subQueues;
+        valueChanged("subQueues");
+    }
+
+    public List<String> getAllSubQueues() {
+        return allSubQueues;
+    }
+
+    public void setAllSubQueues(List<String> allSubQueues) {
+        this.allSubQueues = allSubQueues;
+        valueChanged("allSubQueues");
+    }
+
+    public long getLastUpdateTime() {
+        return lastUpdateTime;
+    }
+
+    public void setLastUpdateTime(long lastUpdateTime) {
+        this.lastUpdateTime = lastUpdateTime;
+        valueChanged("lastUpdateTime");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 1bafc13..9eb7008 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -30,6 +30,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -72,10 +73,14 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
             List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data;
             writeMetrics(metrics);
         } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) {
-            List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data;
-            for (RunningQueueAPIEntity queue : entities) {
-                if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
-                    collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue)));
+            List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data;
+            for (TaggedLogAPIEntity entity : entities) {
+                if (entity instanceof RunningQueueAPIEntity) {
+                    RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity;
+                    if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
+                        collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE),
+                                parseLeafQueueInfo(queue)));
+                    }
                 }
             }
             writeEntities(entities);
@@ -99,7 +104,7 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
         }
     }
 
-    private void writeEntities(List<RunningQueueAPIEntity> entities) {
+    private void writeEntities(List<TaggedLogAPIEntity> entities) {
         try {
             GenericServiceAPIResponseEntity response = client.create(entities);
             if (!response.isSuccess()) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index 4cf745c..5fb041d 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -57,7 +57,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>dataSinkConfig.topic</displayName>
-            <value>hadoop_leaf_queue</value>
+            <value>yarn_queue</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 6b33d31..525ffc2 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -135,6 +135,8 @@ public class MRJobParser implements Runnable {
         JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId);
         jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString());
         jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString());
+        // set an estimated job finished time because it's hard the get the specific one
+        jobExecutionAPIEntity.setEndTime(System.currentTimeMillis());
         mrJobConfigs.remove(mrJobId);
         if (mrJobConfigs.size() == 0) {
             this.parserStatus = ParserStatus.APP_FINISHED;

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/pom.xml b/eagle-jpm/eagle-jpm-service/pom.xml
index d6807bd..197740e 100644
--- a/eagle-jpm/eagle-jpm-service/pom.xml
+++ b/eagle-jpm/eagle-jpm-service/pom.xml
@@ -43,5 +43,10 @@
             <artifactId>eagle-jpm-entity</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-hadoop-queue</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
new file mode 100644
index 0000000..2632423
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
+import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.*;
+
+import static org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME;
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.JOB_QUEUE;
+import static org.apache.eagle.jpm.util.MRJobTagName.USER;
+
+@Path("queue")
+public class RunningQueueResource {
+
+    @GET
+    @Path("memory")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RunningQueueResponse getTopByQueue(@QueryParam("site") String site,
+                                              @QueryParam("queue") String queue,
+                                              @QueryParam("currentTime") long currentTime,
+                                              @QueryParam("top") int top) {
+        RunningQueueResponse result = new RunningQueueResponse();
+        try {
+            if (site == null || queue == null || currentTime == 0L || top == 0) {
+                throw new Exception("Invalid query parameters: site == null || queue == null || currentTime == 0L || top == 0");
+            }
+            Tuple2<String, String> queryTimeRange = getQueryTimeRange(currentTime);
+            Set<String> queueSet = getSubQueueSet(site, queue);
+            List<JobExecutionAPIEntity> runningJobs = getRunningJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1());
+            List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> jobs = getJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1());
+            Set<String> jobIds = new HashSet<>();
+            jobs.forEach(job -> jobIds.add(job.getTags().get(JOB_ID.toString())));
+
+            Map<String, Long> userUsage = new HashMap<>();
+            Map<String, Long> jobUsage = new HashMap<>();
+            for (JobExecutionAPIEntity job : runningJobs) {
+                String jobId = job.getTags().get(JOB_ID.toString());
+                String jobQueue = job.getTags().get(JOB_QUEUE.toString());
+                String user = job.getTags().get(USER.toString());
+
+                if (jobIds.contains(jobId) && queueSet.contains(jobQueue)) {
+                    if (userUsage.containsKey(user)) {
+                        userUsage.put(user, userUsage.get(user) + job.getAllocatedMB());
+                    } else {
+                        userUsage.put(user, job.getAllocatedMB());
+                    }
+                    jobUsage.put(jobId, job.getAllocatedMB());
+                }
+            }
+            result.setJobs(getTopRecords(top, jobUsage));
+            result.setUsers(getTopRecords(top, userUsage));
+        } catch (Exception e) {
+            result.setErrMessage(e.getMessage());
+        }
+        return result;
+    }
+
+    private List<JobExecutionAPIEntity> getRunningJobs(String site, long currentTime, String startTime, String endTime) throws Exception {
+        GenericEntityServiceResource resource = new GenericEntityServiceResource();
+        String query = String.format("%s[@site=\"%s\" and @startTime<=%s and (@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, @allocatedMB}",
+                JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> runningJobResponse = resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+        if (!runningJobResponse.isSuccess() || runningJobResponse.getObj() == null) {
+            throw new IOException(runningJobResponse.getException());
+        }
+
+        return runningJobResponse.getObj();
+    }
+
+    private List<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> getJobs(String site,
+                                                                                      long currentTime,
+                                                                                      String startTime,
+                                                                                      String endTime) throws Exception {
+        GenericEntityServiceResource resource = new GenericEntityServiceResource();
+        String query = String.format("%s[@site=\"%s\" and @startTime<=%s and @endTime>%s]{@jobId}", JPA_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime);
+
+        GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity> response =
+                resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+        if (!response.isSuccess() || response.getObj() == null) {
+            throw new IOException(response.getException());
+        }
+
+        return response.getObj();
+    }
+
+    private Set<String> getSubQueueSet(String site, String parentQueue) throws IOException {
+        GenericEntityServiceResource resource = new GenericEntityServiceResource();
+
+        String query = String.format("%s[@site=\"%s\" and @queue=\"%s\"]{*}", QUEUE_MAPPING_SERVICE_NAME, site, parentQueue);
+        GenericServiceAPIResponseEntity<QueueStructureAPIEntity> responseEntity = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+
+        if (!responseEntity.isSuccess() || responseEntity.getObj() == null) {
+            throw new IOException(responseEntity.getException());
+        }
+
+        Set<String> subQueues = new HashSet<>();
+        subQueues.add(parentQueue);
+        subQueues.addAll(responseEntity.getObj().get(0).getAllSubQueues());
+
+        return subQueues;
+    }
+
+    private Tuple2<String, String> getQueryTimeRange(long currentTime) throws ParseException {
+        String startTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime - DateTimeUtil.ONEHOUR * 12);
+        String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime + DateTimeUtil.ONEMINUTE);
+        return new Tuple2<>(startTime, endTime);
+    }
+
+    private Map<String, Long> getTopRecords(int top, Map<String, Long> map) {
+        Map<String, Long> newMap = new LinkedHashMap<>();
+
+        List<Map.Entry<String,Long>> list = new ArrayList<>(map.entrySet());
+        Collections.sort(list, (o1, o2) -> o1.getValue() < o2.getValue() ? 1 : -1);
+        for (Map.Entry<String, Long> entry : list) {
+            if (newMap.size() < top) {
+                newMap.put(entry.getKey(), entry.getValue());
+            } else {
+                break;
+            }
+        }
+        return newMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
new file mode 100644
index 0000000..1281b66
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import java.util.Map;
+
+public class RunningQueueResponse {
+    private String errMessage;
+    private Map<String, Long> jobs;
+    private Map<String, Long> users;
+
+    public String getErrMessage() {
+        return errMessage;
+    }
+
+    public void setErrMessage(String errMessage) {
+        this.errMessage = errMessage;
+    }
+
+    public Map<String, Long> getJobs() {
+        return jobs;
+    }
+
+    public void setJobs(Map<String, Long> jobs) {
+        this.jobs = jobs;
+    }
+
+    public Map<String, Long> getUsers() {
+        return users;
+    }
+
+    public void setUsers(Map<String, Long> users) {
+        this.users = users;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 49694a5..1997257 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -115,7 +115,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>Kafka Topic for Parsed Data Sink</displayName>
-            <value>hdfs_audit_log_enriched</value>
+            <value>hdfs_audit_event</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/077e27b2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 87d3202..1142e1b 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -133,7 +133,7 @@
         <property>
             <name>dataSinkConfig.topic</name>
             <displayName>Topic For Kafka Data Sink</displayName>
-            <value>topology_health_check</value>
+            <value>topology_check</value>
             <description>topic For kafka data sink</description>
         </property>
         <property>