You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/09 05:25:31 UTC

[3/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr & spark running job monitoring

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
new file mode 100644
index 0000000..bb76213
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.parser;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.spark.crawl.EventType;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.*;
+import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.SparkJobTagName;
+import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+import java.util.function.Function;
+
+public class SparkApplicationParser implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkApplicationParser.class);
+
+    public enum ParserStatus {
+        RUNNING,
+        FINISHED,
+        APP_FINISHED
+    }
+
+    private AppInfo app;
+    private static final int MAX_RETRY_TIMES = 2;
+    private SparkAppEntityCreationHandler sparkAppEntityCreationHandler;
+    //<sparkAppId, SparkAppEntity>
+    private Map<String, SparkAppEntity> sparkAppEntityMap;
+    private Map<String, JobConfig> sparkJobConfigs;
+    private Map<Integer, Pair<Integer, Pair<Long, Long>>> stagesTime;
+    private Set<Integer> completeStages;
+    private Configuration hdfsConf;
+    private SparkRunningConfigManager.EndpointConfig endpointConfig;
+    private final Object lock = new Object();
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+    private Map<String, String> commonTags = new HashMap<>();
+    private SparkRunningJobManager sparkRunningJobManager;
+    private ParserStatus parserStatus;
+    private ResourceFetcher rmResourceFetcher;
+    private int currentAttempt;
+    private boolean first;
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public SparkApplicationParser(SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+                                  SparkRunningConfigManager.EndpointConfig endpointConfig,
+                                  SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+                                  AppInfo app, Map<String, SparkAppEntity> sparkApp,
+                                  SparkRunningJobManager sparkRunningJobManager, ResourceFetcher rmResourceFetcher) {
+        this.sparkAppEntityCreationHandler = new SparkAppEntityCreationHandler(eagleServiceConfig);
+        this.endpointConfig = endpointConfig;
+        this.app = app;
+        this.sparkJobConfigs = new HashMap<>();
+        this.stagesTime = new HashMap<>();
+        this.completeStages = new HashSet<>();
+        this.sparkAppEntityMap = sparkApp;
+        if (this.sparkAppEntityMap == null) {
+            this.sparkAppEntityMap = new HashMap<>();
+        }
+        this.rmResourceFetcher = rmResourceFetcher;
+        this.currentAttempt = 1;
+        this.first = true;
+        this.hdfsConf  = new Configuration();
+        this.hdfsConf.set("fs.defaultFS", endpointConfig.nnEndpoint);
+        this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+        this.hdfsConf.set("hdfs.kerberos.principal", endpointConfig.principal);
+        this.hdfsConf.set("hdfs.keytab.file", endpointConfig.keyTab);
+
+        this.commonTags.put(SparkJobTagName.SITE.toString(), jobExtractorConfig.site);
+        this.commonTags.put(SparkJobTagName.SPARK_USER.toString(), app.getUser());
+        this.commonTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+        this.parserStatus  = ParserStatus.FINISHED;
+        this.sparkRunningJobManager = sparkRunningJobManager;
+    }
+
+    public ParserStatus status() {
+        return this.parserStatus;
+    }
+
+    public void setStatus(ParserStatus status) {
+        this.parserStatus = status;
+    }
+
+    private void finishSparkApp(String sparkAppId) {
+        SparkAppEntity attemptEntity = sparkAppEntityMap.get(sparkAppId);
+        attemptEntity.setYarnState(Constants.AppState.FINISHED.toString());
+        attemptEntity.setYarnStatus(Constants.AppStatus.FAILED.toString());
+        sparkJobConfigs.remove(sparkAppId);
+        if (sparkJobConfigs.size() == 0) {
+            this.parserStatus = ParserStatus.APP_FINISHED;
+        }
+        stagesTime.clear();
+        LOG.info("spark application {} has been finished", sparkAppId);
+    }
+
+    private void fetchSparkRunningInfo() throws Exception {
+        for (int i = 0; i < MAX_RETRY_TIMES; i++) {
+            if (fetchSparkApps()) {
+                break;
+            } else if (i == MAX_RETRY_TIMES - 1) {
+                //check whether the app has finished. if we test that we can connect rm, then we consider the app has finished
+                //if we get here either because of cannot connect rm or the app has finished
+                rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB);
+                sparkAppEntityMap.keySet().forEach(this::finishSparkApp);
+                return;
+            }
+        }
+
+        List<Function<String, Boolean>> functions = new ArrayList<>();
+        functions.add(fetchSparkExecutors);
+        functions.add(fetchSparkJobs);
+        if (!first) {
+            functions.add(fetchSparkStagesAndTasks);
+        }
+
+        this.first = false;
+        for (String sparkAppId : sparkAppEntityMap.keySet()) {
+            for (Function<String, Boolean> function : functions) {
+                int i = 0;
+                for (; i < MAX_RETRY_TIMES; i++) {
+                    if (function.apply(sparkAppId)) {
+                        break;
+                    }
+                }
+                if (i >= MAX_RETRY_TIMES) {
+                    //may caused by rm unreachable
+                    rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB);
+                    finishSparkApp(sparkAppId);
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        synchronized (this.lock) {
+            if (this.parserStatus == ParserStatus.APP_FINISHED) {
+                return;
+            }
+
+            LOG.info("start to process yarn application " + app.getId());
+            try {
+                fetchSparkRunningInfo();
+            } catch (Exception e) {
+                LOG.warn("exception found when process application {}, {}", app.getId(), e);
+                e.printStackTrace();
+            } finally {
+                for (String jobId : sparkAppEntityMap.keySet()) {
+                    sparkAppEntityCreationHandler.add(sparkAppEntityMap.get(jobId));
+                }
+                if (sparkAppEntityCreationHandler.flush()) { //force flush
+                    //we must flush entities before delete from zk in case of missing finish state of jobs
+                    //delete from zk if needed
+                    sparkAppEntityMap.keySet()
+                            .stream()
+                            .filter(
+                                    jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString()) ||
+                                            sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString()))
+                            .forEach(
+                                    jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId));
+                }
+
+                LOG.info("finish process yarn application " + app.getId());
+            }
+
+            if (this.parserStatus == ParserStatus.RUNNING) {
+                this.parserStatus = ParserStatus.FINISHED;
+            }
+        }
+    }
+
+    private JobConfig parseJobConfig(InputStream is) throws Exception {
+        JobConfig jobConfig = new JobConfig();
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            String line;
+            boolean stop = false;
+            while ((line = reader.readLine()) != null && !stop) {
+                try {
+                    JSONParser parser = new JSONParser();
+                    JSONObject eventObj = (JSONObject) parser.parse(line);
+
+                    if (eventObj != null) {
+                        String eventType = (String) eventObj.get("Event");
+                        LOG.info("Event type: " + eventType);
+                        if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
+                            stop = true;
+                            JSONObject sparkProps = (JSONObject) eventObj.get("Spark Properties");
+                            for (Object key : sparkProps.keySet()) {
+                                jobConfig.put((String) key, (String) sparkProps.get(key));
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error(String.format("Fail to parse %s.", line), e);
+                }
+            }
+            
+            return jobConfig;
+        }
+    }
+
+    private JobConfig getJobConfig(String sparkAppId, int attemptId) {
+        //TODO: getResourceManagerVersion() and compare version to make attempt id.
+
+        LOG.info("Get job config for sparkAppId {}, attempt {}, appId {}", sparkAppId, attemptId, app.getId());
+        JobConfig jobConfig = null;
+
+        try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
+//             // For Yarn version >= 2.7,
+//             // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+//             String attemptIdFormatted = String.format("%06d", attemptId);
+//             // remove "application_" to get the number part of appID.
+//             String sparkAppIdNum = sparkAppId.substring(12);
+//             String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+
+            // For Yarn version 2.4.x
+            // log name: application_1464382345557_269065_1
+            String attemptIdString = Integer.toString(attemptId);
+
+            //test appId_attemptId.inprogress/appId_attemptId/appId.inprogress/appId
+            String eventLogDir = this.endpointConfig.eventLog;
+            Path attemptFile = new Path(eventLogDir + "/" + sparkAppId + "_" + attemptIdString + ".inprogress");
+            if (!hdfs.exists(attemptFile)) {
+                attemptFile = new Path(eventLogDir + "/" + sparkAppId + "_" + attemptIdString);
+                if (!hdfs.exists(attemptFile)) {
+                    attemptFile = new Path(eventLogDir + "/" + sparkAppId + ".inprogress");
+                    if (!hdfs.exists(attemptFile)) {
+                        attemptFile = new Path(eventLogDir + "/" + sparkAppId);
+                    }
+                }
+            }
+
+            LOG.info("Attempt File path: " + attemptFile.toString());
+            jobConfig = parseJobConfig(hdfs.open(attemptFile));
+        } catch (Exception e) {
+            LOG.error("Fail to process application {}", sparkAppId, e);
+        }
+
+        return jobConfig;
+    }
+
+    private boolean isClientMode(JobConfig jobConfig) {
+        return jobConfig.containsKey(Constants.SPARK_MASTER_KEY) &&
+               jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client");
+    }
+
+    private boolean fetchSparkApps() {
+        String appURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        SparkApplication[] sparkApplications = null;
+        try {
+            is = InputStreamUtils.getInputStream(appURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch spark application from {}", appURL);
+            sparkApplications = OBJ_MAPPER.readValue(is, SparkApplication[].class);
+        } catch (java.net.ConnectException e) {
+            LOG.warn("fetch spark application from {} failed, {}", appURL, e);
+            e.printStackTrace();
+            return true;
+        } catch (Exception e) {
+            LOG.warn("fetch spark application from {} failed, {}", appURL, e);
+            e.printStackTrace();
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        for (SparkApplication sparkApplication : sparkApplications) {
+            String id = sparkApplication.getId();
+            if (id.contains(" ") || !id.startsWith("app")) {
+                //spark version < 1.6.0 and id contains white space, need research again later
+                LOG.warn("skip spark application {}", id);
+                continue;
+            }
+
+            currentAttempt = sparkApplication.getAttempts().size();
+            int lastSavedAttempt = 1;
+            if (sparkAppEntityMap.containsKey(id)) {
+                lastSavedAttempt = Integer.parseInt(sparkAppEntityMap.get(id).getTags().get(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString()));
+            }
+            for (int j = lastSavedAttempt; j <= currentAttempt; j++) {
+                SparkAppEntity attemptEntity = new SparkAppEntity();
+                commonTags.put(SparkJobTagName.SPARK_APP_NAME.toString(), sparkApplication.getName());
+                commonTags.put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), "" + j);
+                commonTags.put(SparkJobTagName.SPARK_APP_ID.toString(), id);
+                attemptEntity.setTags(new HashMap<>(commonTags));
+                attemptEntity.setAppInfo(app);
+
+                attemptEntity.setStartTime(Utils.dateTimeToLong(sparkApplication.getAttempts().get(j - 1).getStartTime()));
+                attemptEntity.setTimestamp(attemptEntity.getStartTime());
+
+                if (sparkJobConfigs.containsKey(id) && j == currentAttempt) {
+                    attemptEntity.setConfig(sparkJobConfigs.get(id));
+                }
+
+                if (attemptEntity.getConfig() == null) {
+                    attemptEntity.setConfig(getJobConfig(id, j));
+                    if (j == currentAttempt) {
+                        sparkJobConfigs.put(id, attemptEntity.getConfig());
+                    }
+                }
+
+                try {
+                    JobConfig jobConfig = attemptEntity.getConfig();
+                    attemptEntity.setExecMemoryBytes(Utils.parseMemory(jobConfig.get(Constants.SPARK_EXECUTOR_MEMORY_KEY)));
+
+                    attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig) ?
+                            0 :
+                            Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY)));
+                    attemptEntity.setExecutorCores(Integer.parseInt(jobConfig.get(Constants.SPARK_EXECUTOR_CORES_KEY)));
+                    // spark.driver.cores may not be set.
+                    String driverCoresStr = jobConfig.get(Constants.SPARK_DRIVER_CORES_KEY);
+                    int driverCores = 0;
+                    if (driverCoresStr != null && !isClientMode(jobConfig)) {
+                        driverCores = Integer.parseInt(driverCoresStr);
+                    }
+                    attemptEntity.setDriverCores(driverCores);
+                } catch (Exception e) {
+                    LOG.warn("add config failed, {}", e);
+                    e.printStackTrace();
+                }
+
+                if (j == currentAttempt) {
+                    //current attempt
+                    attemptEntity.setYarnState(app.getState());
+                    attemptEntity.setYarnStatus(app.getFinalStatus());
+                    sparkAppEntityMap.put(id, attemptEntity);
+                    this.sparkRunningJobManager.update(app.getId(), id, attemptEntity);
+                } else {
+                    attemptEntity.setYarnState(Constants.AppState.FINISHED.toString());
+                    attemptEntity.setYarnStatus(Constants.AppStatus.FAILED.toString());
+                }
+                sparkAppEntityCreationHandler.add(attemptEntity);
+            }
+        }
+
+        sparkAppEntityCreationHandler.flush();
+        return true;
+    }
+
+    private Function<String, Boolean> fetchSparkExecutors = sparkAppId -> {
+        //only get current attempt
+        SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId);
+        String executorURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_EXECUTORS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        SparkExecutor[] sparkExecutors = null;
+        try {
+            is = InputStreamUtils.getInputStream(executorURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch spark executor from {}", executorURL);
+            sparkExecutors = OBJ_MAPPER.readValue(is, SparkExecutor[].class);
+        } catch (java.net.ConnectException e) {
+            LOG.warn("fetch spark application from {} failed, {}", executorURL, e);
+            e.printStackTrace();
+            return true;
+        } catch (Exception e) {
+            LOG.warn("fetch spark executor from {} failed, {}", executorURL, e);
+            e.printStackTrace();
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+        sparkAppEntity.setExecutors(sparkExecutors.length);
+
+        for (SparkExecutor executor : sparkExecutors) {
+            SparkExecutorEntity entity = new SparkExecutorEntity();
+            entity.setTags(new HashMap<>(sparkAppEntity.getTags()));
+            entity.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executor.getId());
+            entity.setHostPort(executor.getHostPort());
+            entity.setRddBlocks(executor.getRddBlocks());
+            entity.setMemoryUsed(executor.getMemoryUsed());
+            entity.setDiskUsed(executor.getDiskUsed());
+            entity.setActiveTasks(executor.getActiveTasks());
+            entity.setFailedTasks(executor.getFailedTasks());
+            entity.setCompletedTasks(executor.getCompletedTasks());
+            entity.setTotalTasks(executor.getTotalTasks());
+            entity.setTotalDuration(executor.getTotalDuration());
+            entity.setTotalInputBytes(executor.getTotalInputBytes());
+            entity.setTotalShuffleRead(executor.getTotalShuffleRead());
+            entity.setTotalShuffleWrite(executor.getTotalShuffleWrite());
+            entity.setMaxMemory(executor.getMaxMemory());
+
+            entity.setTimestamp(sparkAppEntity.getTimestamp());
+            entity.setStartTime(sparkAppEntity.getStartTime());
+            if (executor.getId().equalsIgnoreCase("driver")) {
+                entity.setExecMemoryBytes(sparkAppEntity.getDriveMemoryBytes());
+                entity.setCores(sparkAppEntity.getDriverCores());
+                entity.setMemoryOverhead(sparkAppEntity.getDriverMemoryOverhead());
+            } else {
+                entity.setExecMemoryBytes(sparkAppEntity.getExecMemoryBytes());
+                entity.setCores(sparkAppEntity.getExecutorCores());
+                entity.setMemoryOverhead(sparkAppEntity.getExecutorMemoryOverhead());
+            }
+            sparkAppEntityCreationHandler.add(entity);
+        }
+        return true;
+    };
+
+    private Function<String, Boolean> fetchSparkJobs = sparkAppId -> {
+        //only get current attempt
+        SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId);
+        String jobURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        SparkJob[] sparkJobs = null;
+        try {
+            is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch spark job from {}", jobURL);
+            sparkJobs = OBJ_MAPPER.readValue(is, SparkJob[].class);
+        } catch (java.net.ConnectException e) {
+            LOG.warn("fetch spark application from {} failed, {}", jobURL, e);
+            e.printStackTrace();
+            return true;
+        } catch (Exception e) {
+            LOG.warn("fetch spark job from {} failed, {}", jobURL, e);
+            e.printStackTrace();
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        sparkAppEntity.setNumJobs(sparkJobs.length);
+        for (SparkJob sparkJob : sparkJobs) {
+            SparkJobEntity entity = new SparkJobEntity();
+            entity.setTags(new HashMap<>(commonTags));
+            entity.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), sparkJob.getJobId() + "");
+            entity.setSubmissionTime(Utils.dateTimeToLong(sparkJob.getSubmissionTime()));
+            if (sparkJob.getCompletionTime() != null) {
+                entity.setCompletionTime(Utils.dateTimeToLong(sparkJob.getCompletionTime()));
+            }
+            entity.setNumStages(sparkJob.getStageIds().size());
+            entity.setStatus(sparkJob.getStatus());
+            entity.setNumTask(sparkJob.getNumTasks());
+            entity.setNumActiveTasks(sparkJob.getNumActiveTasks());
+            entity.setNumCompletedTasks(sparkJob.getNumCompletedTasks());
+            entity.setNumSkippedTasks(sparkJob.getNumSkippedTasks());
+            entity.setNumFailedTasks(sparkJob.getNumFailedTasks());
+            entity.setNumActiveStages(sparkJob.getNumActiveStages());
+            entity.setNumCompletedStages(sparkJob.getNumCompletedStages());
+            entity.setNumSkippedStages(sparkJob.getNumSkippedStages());
+            entity.setNumFailedStages(sparkJob.getNumFailedStages());
+            entity.setStages(sparkJob.getStageIds());
+            entity.setTimestamp(sparkAppEntity.getTimestamp());
+
+            sparkAppEntity.setTotalStages(sparkAppEntity.getTotalStages() + entity.getNumStages());
+            sparkAppEntity.setTotalTasks(sparkAppEntity.getTotalTasks() + entity.getNumTask());
+            sparkAppEntity.setActiveTasks(sparkAppEntity.getActiveTasks() + entity.getNumActiveTasks());
+            sparkAppEntity.setCompleteTasks(sparkAppEntity.getCompleteTasks() + entity.getNumCompletedTasks());
+            sparkAppEntity.setSkippedTasks(sparkAppEntity.getSkippedTasks() + entity.getNumSkippedTasks());
+            sparkAppEntity.setFailedTasks(sparkAppEntity.getFailedStages() + entity.getNumFailedTasks());
+            sparkAppEntity.setActiveStages(sparkAppEntity.getActiveStages() + entity.getNumActiveStages());
+            sparkAppEntity.setCompleteStages(sparkAppEntity.getCompleteStages() + entity.getNumCompletedStages());
+            sparkAppEntity.setSkippedStages(sparkAppEntity.getSkippedStages() + entity.getNumSkippedStages());
+            sparkAppEntity.setFailedStages(sparkAppEntity.getFailedStages() + entity.getNumFailedStages());
+
+            for (Integer stageId : sparkJob.getStageIds()) {
+                stagesTime.put(stageId, Pair.of(sparkJob.getJobId(), Pair.of(entity.getSubmissionTime(), entity.getCompletionTime())));
+            }
+            sparkAppEntityCreationHandler.add(entity);
+        }
+        return true;
+    };
+
+    private Function<String, Boolean> fetchSparkStagesAndTasks = sparkAppId -> {
+        SparkAppEntity sparkAppEntity = sparkAppEntityMap.get(sparkAppId);
+        String stageURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_STAGES_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        SparkStage[] sparkStages;
+        try {
+            is = InputStreamUtils.getInputStream(stageURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch spark stage from {}", stageURL);
+            sparkStages = OBJ_MAPPER.readValue(is, SparkStage[].class);
+        } catch (java.net.ConnectException e) {
+            LOG.warn("fetch spark application from {} failed, {}", stageURL, e);
+            e.printStackTrace();
+            return true;
+        } catch (Exception e) {
+            LOG.warn("fetch spark stage from {} failed, {}", stageURL, e);
+            e.printStackTrace();
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        for (SparkStage sparkStage : sparkStages) {
+            //TODO
+            //we need a thread pool to handle this if there are many stages
+            SparkStage stage;
+            try {
+                stageURL = app.getTrackingUrl() + Constants.SPARK_APPS_URL + "/" + sparkAppId + "/" + Constants.SPARK_STAGES_URL + "/" + sparkStage.getStageId() + "?" + Constants.ANONYMOUS_PARAMETER;
+                is = InputStreamUtils.getInputStream(stageURL, null, Constants.CompressionType.NONE);
+                LOG.info("fetch spark stage from {}", stageURL);
+                stage = OBJ_MAPPER.readValue(is, SparkStage[].class)[0];
+            } catch (Exception e) {
+                LOG.warn("fetch spark stage from {} failed, {}", stageURL, e);
+                e.printStackTrace();
+                return false;
+            } finally {
+                Utils.closeInputStream(is);
+            }
+
+            if (this.completeStages.contains(stage.getStageId())) {
+                return true;
+            }
+            SparkStageEntity stageEntity = new SparkStageEntity();
+            stageEntity.setTags(new HashMap<>(commonTags));
+            stageEntity.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), stagesTime.get(stage.getStageId()).getLeft() + "");
+            stageEntity.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stage.getStageId() + "");
+            stageEntity.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stage.getAttemptId() + "");
+            stageEntity.setStatus(stage.getStatus());
+            stageEntity.setNumActiveTasks(stage.getNumActiveTasks());
+            stageEntity.setNumCompletedTasks(stage.getNumCompleteTasks());
+            stageEntity.setNumFailedTasks(stage.getNumFailedTasks());
+            stageEntity.setExecutorRunTime(stage.getExecutorRunTime());
+            stageEntity.setInputBytes(stage.getInputBytes());
+            stageEntity.setInputRecords(stage.getInputRecords());
+            stageEntity.setOutputBytes(stage.getOutputBytes());
+            stageEntity.setOutputRecords(stage.getOutputRecords());
+            stageEntity.setShuffleReadBytes(stage.getShuffleReadBytes());
+            stageEntity.setShuffleReadRecords(stage.getShuffleReadRecords());
+            stageEntity.setShuffleWriteBytes(stage.getShuffleWriteBytes());
+            stageEntity.setShuffleWriteRecords(stage.getShuffleWriteRecords());
+            stageEntity.setMemoryBytesSpilled(stage.getMemoryBytesSpilled());
+            stageEntity.setDiskBytesSpilled(stage.getDiskBytesSpilled());
+            stageEntity.setName(stage.getName());
+            stageEntity.setSchedulingPool(stage.getSchedulingPool());
+            stageEntity.setSubmitTime(stagesTime.get(stage.getStageId()).getRight().getLeft());
+            stageEntity.setTimestamp(stageEntity.getSubmitTime());
+            stageEntity.setCompleteTime(stagesTime.get(stage.getStageId()).getRight().getRight());
+            stageEntity.setNumTasks(stage.getTasks() == null ? 0 : stage.getTasks().size());
+            fetchTasksFromStage(stageEntity, stage);
+            sparkAppEntityCreationHandler.add(stageEntity);
+            if (stage.getStatus().equals(Constants.StageState.COMPLETE.toString())) {
+                this.completeStages.add(stage.getStageId());
+                LOG.info("stage {} of spark {} has finished", stage.getStageId(), sparkAppId);
+            }
+
+            sparkAppEntity.setInputBytes(sparkAppEntity.getInputBytes() + stageEntity.getInputBytes());
+            sparkAppEntity.setInputRecords(sparkAppEntity.getInputBytes() + stageEntity.getInputRecords());
+            sparkAppEntity.setOutputBytes(sparkAppEntity.getOutputBytes() + stageEntity.getOutputBytes());
+            sparkAppEntity.setOutputRecords(sparkAppEntity.getOutputBytes() + stageEntity.getOutputRecords());
+            sparkAppEntity.setShuffleReadBytes(sparkAppEntity.getShuffleReadBytes() + stageEntity.getShuffleReadBytes());
+            sparkAppEntity.setShuffleReadRecords(sparkAppEntity.getShuffleReadRecords() + stageEntity.getShuffleReadRecords());
+            sparkAppEntity.setShuffleWriteBytes(sparkAppEntity.getShuffleWriteBytes() + stageEntity.getShuffleWriteBytes());
+            sparkAppEntity.setShuffleWriteRecords(sparkAppEntity.getShuffleWriteRecords() + stageEntity.getShuffleWriteRecords());
+            sparkAppEntity.setExecutorRunTime(sparkAppEntity.getExecutorRunTime() + stageEntity.getExecutorRunTime());
+            sparkAppEntity.setExecutorDeserializeTime(sparkAppEntity.getExecutorDeserializeTime() + stageEntity.getExecutorDeserializeTime());
+            sparkAppEntity.setResultSize(sparkAppEntity.getResultSize() + stageEntity.getResultSize());
+            sparkAppEntity.setJvmGcTime(sparkAppEntity.getJvmGcTime() + stageEntity.getJvmGcTime());
+            sparkAppEntity.setResultSerializationTime(sparkAppEntity.getResultSerializationTime() + stageEntity.getResultSerializationTime());
+            sparkAppEntity.setMemoryBytesSpilled(sparkAppEntity.getMemoryBytesSpilled() + stageEntity.getMemoryBytesSpilled());
+            sparkAppEntity.setDiskBytesSpilled(sparkAppEntity.getDiskBytesSpilled() + stageEntity.getDiskBytesSpilled());
+            sparkAppEntity.setCompleteTasks(sparkAppEntity.getCompleteTasks() + stageEntity.getNumCompletedTasks());
+        }
+        return true;
+    };
+
+    private void fetchTasksFromStage(SparkStageEntity stageEntity, SparkStage stage) {
+        Map<String, SparkTask> tasks = stage.getTasks();
+        for (String key : tasks.keySet()) {
+            SparkTask task = tasks.get(key);
+            SparkTaskEntity taskEntity = new SparkTaskEntity();
+            taskEntity.setTags(new HashMap<>(stageEntity.getTags()));
+            taskEntity.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), task.getAttempt() + "");
+            taskEntity.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), task.getIndex() + "");
+            taskEntity.setTaskId(task.getTaskId());
+            taskEntity.setLaunchTime(Utils.dateTimeToLong(task.getLaunchTime()));
+            taskEntity.setHost(task.getHost());
+            taskEntity.setTaskLocality(task.getTaskLocality());
+            taskEntity.setSpeculative(task.isSpeculative());
+            taskEntity.setTimestamp(stageEntity.getTimestamp());
+
+            SparkTaskMetrics taskMetrics = task.getTaskMetrics();
+            taskEntity.setExecutorDeserializeTime(taskMetrics == null ? 0 : taskMetrics.getExecutorDeserializeTime());
+            taskEntity.setExecutorRunTime(taskMetrics == null ? 0 : taskMetrics.getExecutorRunTime());
+            taskEntity.setResultSize(taskMetrics == null ? 0 : taskMetrics.getResultSize());
+            taskEntity.setJvmGcTime(taskMetrics == null ? 0 : taskMetrics.getJvmGcTime());
+            taskEntity.setResultSerializationTime(taskMetrics == null ? 0 : taskMetrics.getResultSerializationTime());
+            taskEntity.setMemoryBytesSpilled(taskMetrics == null ? 0 : taskMetrics.getMemoryBytesSpilled());
+            taskEntity.setDiskBytesSpilled(taskMetrics == null ? 0 : taskMetrics.getDiskBytesSpilled());
+
+            SparkTaskInputMetrics inputMetrics = null;
+            if (taskMetrics != null && taskMetrics.getInputMetrics() != null) {
+                inputMetrics = taskMetrics.getInputMetrics();
+            }
+            taskEntity.setInputBytes(inputMetrics == null ? 0 : inputMetrics.getBytesRead());
+            taskEntity.setInputRecords(inputMetrics == null ? 0 : inputMetrics.getRecordsRead());
+
+            //need to figure outputMetrics
+
+            SparkTaskShuffleReadMetrics shuffleReadMetrics = null;
+            if (taskMetrics != null && taskMetrics.getShuffleReadMetrics() != null) {
+                shuffleReadMetrics = taskMetrics.getShuffleReadMetrics();
+            }
+            taskEntity.setShuffleReadRemoteBytes(shuffleReadMetrics == null ? 0 : shuffleReadMetrics.getRemoteBytesRead());
+            taskEntity.setShuffleReadRecords(shuffleReadMetrics == null ? 0 : shuffleReadMetrics.getRecordsRead());
+
+            SparkTaskShuffleWriteMetrics shuffleWriteMetrics = null;
+            if (taskMetrics != null && taskMetrics.getShuffleWriteMetrics() != null) {
+                shuffleWriteMetrics = taskMetrics.getShuffleWriteMetrics();
+            }
+            taskEntity.setShuffleWriteBytes(shuffleWriteMetrics == null ? 0 : shuffleWriteMetrics.getBytesWritten());
+            taskEntity.setShuffleWriteRecords(shuffleWriteMetrics == null ? 0 : shuffleWriteMetrics.getRecordsWritten());
+
+            stageEntity.setExecutorDeserializeTime(stageEntity.getExecutorDeserializeTime() + taskEntity.getExecutorDeserializeTime());
+            stageEntity.setResultSize(stageEntity.getResultSize() + taskEntity.getResultSize());
+            stageEntity.setJvmGcTime(stageEntity.getJvmGcTime() + taskEntity.getJvmGcTime());
+            stageEntity.setResultSerializationTime(stageEntity.getResultSerializationTime() + taskEntity.getResultSerializationTime());
+
+            this.sparkAppEntityCreationHandler.add(taskEntity);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
new file mode 100644
index 0000000..2b6c62f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.recover;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
+import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class SparkRunningJobManager implements Serializable {
+    private RunningJobManager runningJobManager;
+
+    public SparkRunningJobManager(SparkRunningConfigManager.ZKStateConfig config) {
+        this.runningJobManager = new RunningJobManager(config.zkQuorum,
+                config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+    }
+
+    public Map<String, SparkAppEntity> recoverYarnApp(String appId) throws Exception {
+        Map<String, Pair<Map<String, String>, AppInfo>> result = this.runningJobManager.recoverYarnApp(appId);
+        Map<String, SparkAppEntity> apps = new HashMap<>();
+        for (String jobId : result.keySet()) {
+            Pair<Map<String, String>, AppInfo> job = result.get(jobId);
+            SparkAppEntity sparkAppEntity = new SparkAppEntity();
+            sparkAppEntity.setTags(job.getLeft());
+            sparkAppEntity.setAppInfo(job.getRight());
+            sparkAppEntity.setTimestamp(job.getRight().getStartedTime());
+            apps.put(jobId, sparkAppEntity);
+        }
+        return apps;
+    }
+
+    public Map<String, Map<String, SparkAppEntity>> recover() {
+        //we need read from zookeeper, path looks like /apps/mr/running/yarnAppId/jobId/
+        //<yarnAppId, <jobId, JobExecutionAPIEntity>>
+        Map<String, Map<String, SparkAppEntity>> result = new HashMap<>();
+        Map<String, Map<String, Pair<Map<String, String>, AppInfo>>> apps = this.runningJobManager.recover();
+        for (String appId : apps.keySet()) {
+            result.put(appId, new HashMap<>());
+            Map<String, Pair<Map<String, String>, AppInfo>> jobs = apps.get(appId);
+
+            for (String jobId : jobs.keySet()) {
+                Pair<Map<String, String>, AppInfo> job = jobs.get(jobId);
+                SparkAppEntity sparkAppEntity = new SparkAppEntity();
+                sparkAppEntity.setTags(job.getLeft());
+                sparkAppEntity.setAppInfo(job.getRight());
+                sparkAppEntity.setTimestamp(job.getRight().getStartedTime());
+                result.get(appId).put(jobId, sparkAppEntity);
+            }
+        }
+        return result;
+    }
+
+    public void update(String yarnAppId, String jobId, SparkAppEntity entity) {
+        this.runningJobManager.update(yarnAppId, jobId, entity.getTags(), entity.getAppInfo());
+    }
+
+    public void delete(String yarnAppId, String jobId) {
+        this.runningJobManager.delete(yarnAppId, jobId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
new file mode 100644
index 0000000..6be0cfd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
+import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class SparkRunningJobFetchSpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobFetchSpout.class);
+
+    private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
+    private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    private SparkRunningConfigManager.EndpointConfig endpointConfig;
+    private ResourceFetcher resourceFetcher;
+    private SpoutOutputCollector collector;
+    private boolean init;
+    private transient SparkRunningJobManager sparkRunningJobManager;
+    private Set<String> runningYarnApps;
+
+    public SparkRunningJobFetchSpout(SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+                                     SparkRunningConfigManager.EndpointConfig endpointConfig,
+                                     SparkRunningConfigManager.ZKStateConfig zkStateConfig) {
+        this.jobExtractorConfig = jobExtractorConfig;
+        this.endpointConfig = endpointConfig;
+        this.zkStateConfig = zkStateConfig;
+        this.init = !(zkStateConfig.recoverEnabled);
+        this.runningYarnApps = new HashSet<>();
+    }
+
+    @Override
+    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+        resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
+        collector = spoutOutputCollector;
+        this.sparkRunningJobManager = new SparkRunningJobManager(zkStateConfig);
+    }
+
+    @Override
+    public void nextTuple() {
+        LOG.info("Start to fetch spark running jobs");
+        try {
+            Map<String, Map<String, SparkAppEntity>> sparkApps = null;
+            List<AppInfo> apps;
+            if (!this.init) {
+                sparkApps = recoverRunningApps();
+
+                apps = new ArrayList<>();
+                for (String appId : sparkApps.keySet()) {
+                    Map<String, SparkAppEntity> jobs = sparkApps.get(appId);
+                    if (jobs.size() > 0) {
+                        Set<String> jobIds = jobs.keySet();
+                        apps.add(jobs.get(jobIds.iterator().next()).getAppInfo());
+                        this.runningYarnApps.add(appId);
+                    }
+                }
+                LOG.info("recover {} spark yarn apps from zookeeper", apps.size());
+                this.init = true;
+            } else {
+                apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_SPARK_JOB);
+                LOG.info("get {} apps from resource manager", apps == null ? 0 : apps.size());
+                Set<String> running = new HashSet<>();
+                Iterator<String> appIdIterator = this.runningYarnApps.iterator();
+                while (appIdIterator.hasNext()) {
+                    String appId = appIdIterator.next();
+                    boolean hasFinished = true;
+                    if (apps != null) {
+                        for (AppInfo appInfo : apps) {
+                            if (appId.equals(appInfo.getId())) {
+                                hasFinished = false;
+                            }
+                            running.add(appInfo.getId());
+                        }
+
+                        if (hasFinished) {
+                            try {
+                                Map<String, SparkAppEntity> result = this.sparkRunningJobManager.recoverYarnApp(appId);
+                                if (result.size() > 0) {
+                                    if (sparkApps == null) {
+                                        sparkApps = new HashMap<>();
+                                    }
+                                    sparkApps.put(appId, result);
+                                    AppInfo appInfo = result.get(result.keySet().iterator().next()).getAppInfo();
+                                    appInfo.setState(Constants.AppState.FINISHED.toString());
+                                    apps.add(appInfo);
+                                }
+                            } catch (KeeperException.NoNodeException e) {
+                                LOG.warn("{}", e);
+                                LOG.warn("yarn app {} has finished", appId);
+                            }
+                        }
+                    }
+                }
+
+                this.runningYarnApps = running;
+                LOG.info("get {} total apps(contains finished)", apps == null ? 0 : apps.size());
+            }
+
+            if (apps != null) {
+                for (AppInfo app : apps) {
+                    LOG.info("emit spark yarn application " + app.getId());
+                    if (sparkApps != null) {
+                        //emit (AppInfo, Map<String, SparkAppEntity>)
+                        collector.emit(new Values(app.getId(), app, sparkApps.get(app.getId())));
+                    } else {
+                        collector.emit(new Values(app.getId(), app, null));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                Thread.sleep(jobExtractorConfig.fetchRunningJobInterval * 1000);
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    private Map<String, Map<String, SparkAppEntity>> recoverRunningApps() {
+        //we need read from zookeeper, path looks like /apps/spark/running/yarnAppId/appId/
+        //content of path /apps/spark/running/yarnAppId/appId is SparkAppEntity(current attempt)
+        //as we know, a yarn application may contains many spark applications
+        //so, the returned results is a Map, key is yarn appId
+        Map<String, Map<String, SparkAppEntity>> result = this.sparkRunningJobManager.recover();
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("appId", "appInfo", "sparkAppEntity"));
+    }
+
+    @Override
+    public void fail(Object msgId) {
+
+    }
+
+    @Override
+    public void ack(Object msgId) {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
new file mode 100644
index 0000000..6928240
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.running.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.spark.running.common.SparkRunningConfigManager;
+import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
+import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser;
+import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+
+public class SparkRunningJobParseBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(SparkRunningJobParseBolt.class);
+
+    private SparkRunningConfigManager.ZKStateConfig zkStateConfig;
+    private SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+    private SparkRunningConfigManager.EndpointConfig endpointConfig;
+    private SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    private ExecutorService executorService;
+    private Map<String, SparkApplicationParser> runningSparkParsers;
+    private ResourceFetcher resourceFetcher;
+    public SparkRunningJobParseBolt(SparkRunningConfigManager.ZKStateConfig zkStateConfig,
+                                    SparkRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+                                    SparkRunningConfigManager.EndpointConfig endpointConfig,
+                                    SparkRunningConfigManager.JobExtractorConfig jobExtractorConfig) {
+        this.zkStateConfig = zkStateConfig;
+        this.eagleServiceConfig = eagleServiceConfig;
+        this.endpointConfig = endpointConfig;
+        this.jobExtractorConfig = jobExtractorConfig;
+        this.runningSparkParsers = new HashMap<>();
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.executorService = Executors.newFixedThreadPool(jobExtractorConfig.parseThreadPoolSize);
+        this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        AppInfo appInfo = (AppInfo)tuple.getValue(1);
+        Map<String, SparkAppEntity> sparkApp = (Map<String, SparkAppEntity>)tuple.getValue(2);
+
+        LOG.info("get spark yarn application " + appInfo.getId());
+
+        SparkApplicationParser applicationParser;
+        if (!runningSparkParsers.containsKey(appInfo.getId())) {
+            applicationParser = new SparkApplicationParser(eagleServiceConfig, endpointConfig, jobExtractorConfig, appInfo, sparkApp, new SparkRunningJobManager(zkStateConfig), resourceFetcher);
+            runningSparkParsers.put(appInfo.getId(), applicationParser);
+            LOG.info("create application parser for {}", appInfo.getId());
+        } else {
+            applicationParser = runningSparkParsers.get(appInfo.getId());
+        }
+
+        Set<String> runningParserIds = new HashSet<>(runningSparkParsers.keySet());
+        runningParserIds.stream()
+                .filter(appId -> runningSparkParsers.get(appId).status() == SparkApplicationParser.ParserStatus.APP_FINISHED)
+                .forEach(appId -> {
+                    runningSparkParsers.remove(appId);
+                    LOG.info("remove parser {}", appId);
+                });
+
+        if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
+                applicationParser.status() == SparkApplicationParser.ParserStatus.FINISHED) {
+            applicationParser.setStatus(SparkApplicationParser.ParserStatus.RUNNING);
+            executorService.execute(applicationParser);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
new file mode 100644
index 0000000..d93a135
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "local",
+    "topologyName" : "sparkRunningJob",
+    "stormConfigFile" : "storm.yaml",
+    "parallelismConfig" : {
+      "sparkRunningJobFetchSpout" : 1,
+      "sparkRunningJobParseBolt" : 4
+    },
+    "tasks" : {
+      "sparkRunningJobFetchSpout" : 1,
+      "sparkRunningJobParseBolt" : 4
+    },
+    "workers" : 2
+  },
+
+  "jobExtractorConfig" : {
+    "site" : "sandbox",
+    "fetchRunningJobInterval" : 15,
+    "parseThreadPoolSize" : 5
+  },
+
+  "dataSourceConfig" : {
+    "rmUrls": ["http://sandbox.hortonworks.com:8088"],
+    "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
+    "principal" : "", #if not need, then empty
+    "keytab" : "",
+    "eventLog" : "/spark-history"
+  },
+
+  "zookeeperConfig" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkPort" : "2181",
+    "zkRoot" : "/apps/spark/running",
+    "recoverEnabled" : false,
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 20000
+  },
+
+  "eagleProps" : {
+    "mailHost" : "abc.com",
+    "mailDebug" : "true",
+    eagleService.host:"sandbox.hortonworks.com",
+    eagleService.port: 9099,
+    eagleService.username: "admin",
+    eagleService.password : "secret",
+    eagleService.readTimeOutSeconds : 20,
+    eagleService.maxFlushNum : 500
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 0792f15..a633fd4 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -17,14 +17,23 @@
 
 package org.apache.eagle.jpm.util;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class Constants {
+    private final static Logger LOG = LoggerFactory.getLogger(Constants.class);
 
+    //SPARK
     public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
     public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
     public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
     public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
     public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
-
+    public final static String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService";
+    public final static String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService";
+    public final static String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService";
+    public final static String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService";
+    public final static String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService";
     public static final String APPLICATION_PREFIX = "application";
     public static final String JOB_PREFIX = "job";
     public static final String V2_APPS_URL = "ws/v1/cluster/apps";
@@ -33,17 +42,123 @@ public class Constants {
     public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
     public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";
 
+    public static final String SPARK_MASTER_KEY = "spark.master";
+    public static final String SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory";
+    public static final String SPARK_DRIVER_MEMORY_KEY = "spark.driver.memory";
+    public static final String SPARK_YARN_AM_MEMORY_KEY = "spark.yarn.am.memory";
+    public static final String SPARK_EXECUTOR_CORES_KEY = "spark.executor.cores";
+    public static final String SPARK_DRIVER_CORES_KEY = "spark.driver.cores";
+    public static final String SPARK_YARN_AM_CORES_KEY = "spark.yarn.am.cores";
+    public static final String SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD_KEY = "spark.yarn.executor.memoryOverhead";
+    public static final String SPARK_YARN_DRIVER_MEMORY_OVERHEAD_KEY = "spark.yarn.driver.memoryOverhead";
+    public static final String SPARK_YARN_am_MEMORY_OVERHEAD_KEY = "spark.yarn.am.memoryOverhead";
+
     public static final String SPARK_APPS_URL ="api/v1/applications";
+    public static final String SPARK_EXECUTORS_URL = "executors";
+    public static final String SPARK_JOBS_URL = "jobs";
+    public static final String SPARK_STAGES_URL = "stages";
+    public static final String MR_JOBS_URL = "ws/v1/mapreduce/jobs";
+    public static final String MR_JOB_COUNTERS_URL = "counters";
+    public static final String MR_TASKS_URL = "tasks";
+    public static final String MR_TASK_ATTEMPTS_URL = "attempts";
+    public static final String MR_CONF_URL = "conf";
+
+    public static final String YARN_API_CLUSTER_INFO = "ws/v1/cluster/info";
 
     public enum CompressionType {
         GZIP, NONE
     }
     public enum JobState {
-        RUNNING, COMPLETED, ALL
+        NEW, INITED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED, ERROR, FINISHED, ALL
+    }
+    public enum TaskState {
+        NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+    }
+    public enum StageState {
+        ACTIVE, COMPLETE, PENDING
+    }
+    public enum AppState {
+        NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
+    }
+    public enum AppStatus {
+        UNDEFINED, SUCCEEDED, FAILED, KILLED
     }
-
     public enum ResourceType {
-         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL
+         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO
+    }
+
+    //MR
+    public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService";
+    public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService";
+    public static final String JPA_JOB_EXECUTION_SERVICE_NAME = "JobExecutionService";
+    public static final String JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME = "RunningJobExecutionService";
+    public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService";
+    public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService";
+    public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService";
+    public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService";
+    public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService";
+    public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService";
+    public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
+
+    public static final String JOB_TASK_TYPE_TAG = "taskType";
+
+    public static class JobConfiguration {
+        // job type
+        public static final String SCOOBI_JOB = "scoobi.mode";
+        public static final String HIVE_JOB = "hive.query.string";
+        public static final String PIG_JOB = "pig.script";
+        public static final String CASCADING_JOB = "cascading.app.name";
     }
 
+    /**
+     * MR task types
+     */
+    public enum TaskType {
+        SETUP, MAP, REDUCE, CLEANUP
+    }
+
+    public enum JobType {
+        CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"),
+        NOTAVALIABLE("N/A")
+        ;
+        private String value;
+        JobType(String value){
+            this.value = value;
+        }
+        @Override
+        public String toString() {
+            return this.value;
+        }
+    }
+
+    public static final String FILE_SYSTEM_COUNTER = "org.apache.hadoop.mapreduce.FileSystemCounter";
+    public static final String TASK_COUNTER = "org.apache.hadoop.mapreduce.TaskCounter";
+    public static final String JOB_COUNTER = "org.apache.hadoop.mapreduce.JobCounter";
+
+    public static final String MAP_TASK_ATTEMPT_COUNTER = "MapTaskAttemptCounter";
+    public static final String REDUCE_TASK_ATTEMPT_COUNTER = "ReduceTaskAttemptCounter";
+
+    public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "MapTaskAttemptFileSystemCounter";
+    public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "ReduceTaskAttemptFileSystemCounter";
+
+    public enum TaskAttemptCounter {
+        TASK_ATTEMPT_DURATION,
+    }
+
+    public enum JobCounter {
+        DATA_LOCAL_MAPS,
+        RACK_LOCAL_MAPS,
+        TOTAL_LAUNCHED_MAPS
+    }
+
+    public static final String metricFormat = "%s.%s";
+    public static final String ALLOCATED_MB = "allocatedmb";
+    public static final String ALLOCATED_VCORES = "allocatedvcores";
+    public static final String RUNNING_CONTAINERS = "runningcontainers";
+    public static final String TASK_EXECUTION_TIME = "taskduration";
+    public static final String JOB_LEVEL = "job";
+    public static final String TASK_LEVEL = "task";
+
+    public static final String JOB_DEFINITION_ID_KEY = "jobDefId";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
index 8adb001..325a92a 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -29,7 +29,7 @@ public class HDFSUtil {
 
     public static FileSystem getFileSystem(Configuration conf) throws IOException {
         HDFSUtil.login(conf);
-       return FileSystem.get(conf);
+        return FileSystem.get(conf);
     }
 
     public static void login(Configuration kConfig) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
new file mode 100644
index 0000000..ea8e4f4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/MRJobTagName.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+public enum MRJobTagName {
+    SITE("site"),
+    RACK("rack"),
+    HOSTNAME("hostname"),
+    JOB_NAME("jobName"),
+    JOD_DEF_ID("jobDefId"),
+    JOB_ID("jobId"),
+    TASK_ID("taskId"),
+    TASK_ATTEMPT_ID("taskAttemptId"),
+    JOB_STATUS("jobStatus"),
+    USER("user"),
+    TASK_TYPE("taskType"),
+    TASK_EXEC_TYPE("taskExecType"),
+    ERROR_CATEGORY("errorCategory"),
+    JOB_QUEUE("queue"),
+    RULE_TYPE("ruleType"),
+    JOB_TYPE("jobType");
+
+    private String tagName; 
+    private MRJobTagName(String tagName) {
+        this.tagName = tagName;
+    }
+    
+    public String toString() {
+
+        return this.tagName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
new file mode 100644
index 0000000..7a613eb
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+import jline.internal.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class Utils {
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+    public static void closeInputStream(InputStream is) {
+        if (is != null) {
+            try {
+                is.close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static void sleep(long seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static long dateTimeToLong(String date) {
+        // date is like: 2016-07-29T19:35:40.715GMT
+        long timestamp = 0L;
+        try {
+            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSSzzz");
+            Date parsedDate = dateFormat.parse(date);
+            timestamp = parsedDate.getTime();
+        } catch(ParseException e) {
+            e.printStackTrace();
+        }
+
+        if (timestamp == 0L) {
+            LOG.error("Not able to parse date: " + date);
+        }
+
+        return timestamp;
+    }
+
+    public static long parseMemory(String memory) {
+        if (memory.endsWith("g") || memory.endsWith("G")) {
+            int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * 1024 * executorGB;
+        } else if (memory.endsWith("m") || memory.endsWith("M")) {
+            int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * executorMB;
+        } else if (memory.endsWith("k") || memory.endsWith("K")) {
+            int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * executorKB;
+        } else if (memory.endsWith("t") || memory.endsWith("T")) {
+            int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * 1024 * 1024 * executorTB;
+        } else if (memory.endsWith("p") || memory.endsWith("P")) {
+            int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
+        }
+        Log.info("Cannot parse memory info " +  memory);
+        return 0l;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
new file mode 100644
index 0000000..c8572e9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupDictionary.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util.jobcounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure
+ * counters.
+ *
+ */
+public final class CounterGroupDictionary {
+
+    private final List<CounterGroupKey> groupKeys = new ArrayList<>();
+
+    private static volatile CounterGroupDictionary instance = null;
+    private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class);
+
+    private CounterGroupDictionary() {}
+
+    public static CounterGroupDictionary getInstance() throws JobCounterException {
+        if (instance == null) {
+            synchronized (CounterGroupDictionary.class) {
+                if (instance == null) {
+                    CounterGroupDictionary tmp = new CounterGroupDictionary();
+                    tmp.initialize();
+                    instance = tmp;
+                }
+            }
+        }
+        return instance;
+    }
+
+    public CounterGroupKey getCounterGroupByName(String groupName) {
+        for (CounterGroupKey groupKey : groupKeys) {
+            if (groupKey.getName().equalsIgnoreCase(groupName)) {
+                return groupKey;
+            }
+        }
+        return null;
+    }
+
+    public CounterGroupKey getCounterGroupByIndex(int groupIndex) {
+        if (groupIndex < 0 || groupIndex >= groupKeys.size()) {
+            return null;
+        }
+        return groupKeys.get(groupIndex);
+    }
+
+    private void initialize() throws JobCounterException {
+        // load config.properties file from classpath
+        InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf");
+        try {
+            if (is == null) {
+                is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf");
+                if (is == null) {
+                    final String errMsg = "Failed to load JobCounter.conf";
+                    LOG.error(errMsg);
+                    throw new JobCounterException(errMsg);
+                }
+            }
+            final Properties prop = new Properties();
+            try {
+                prop.load(is);
+            } catch(Exception ex) {
+                final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage();
+                LOG.error(errMsg, ex);
+                throw new JobCounterException(errMsg, ex);
+            }
+            int groupIndex = 0;
+            while (parseGroup(groupIndex, prop)) {
+                ++groupIndex;
+            }
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                }
+            }
+        }
+    }
+
+    private boolean parseGroup(int groupIndex, Properties prop) {
+        final String groupKeyBase = "counter.group" + groupIndex;
+        final String groupNameKey = groupKeyBase + ".name";
+        final String groupName = prop.getProperty(groupNameKey);
+
+        if (groupName == null) {
+            return false;
+        }
+
+        final String groupDescriptionKey = groupKeyBase + ".description";
+        final String groupDescription = prop.getProperty(groupDescriptionKey);
+        final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription);
+        final ArrayList<CounterKey> counters = new ArrayList<CounterKey>();
+
+        int counterIndex = 0;
+        while (parseCounter(groupKey, counterIndex, counters, prop)) {
+            ++counterIndex;
+        }
+        groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()]));
+        groupKeys.add(groupKey);
+        return true;
+    }
+
+    private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List<CounterKey> counters, Properties prop) {
+        final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex;
+        final String counterNameKey = counterKeyBase + ".names";
+        final String counterNamesString = prop.getProperty(counterNameKey);
+
+        if (counterNamesString == null) {
+            return false;
+        }
+        final String[] names = counterNamesString.split(",");
+        final List<String> counterNames = new ArrayList<String>();
+        for (String name : names) {
+            counterNames.add(name.trim());
+        }
+
+        final String counterDescriptionKey = counterKeyBase + ".description";
+        final String counterDescription = prop.getProperty(counterDescriptionKey);
+
+        CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey);
+        counters.add(counter);
+        return true;
+    }
+
+    private static class CounterKeyImpl implements CounterKey {
+        private final int index;
+        private final List<String> counterNames;
+        private final String description;
+        private final CounterGroupKey groupKey;
+
+        public CounterKeyImpl(int index, List<String> counterNames, String description, CounterGroupKey groupKey) {
+            this.index = index;
+            this.counterNames = counterNames;
+            this.description = description;
+            this.groupKey = groupKey;
+        }
+        @Override
+        public int getIndex() {
+            return index;
+        }
+        @Override
+        public List<String> getNames() {
+            return counterNames;
+        }
+        @Override
+        public String getDescription() {
+            return description;
+        }
+        @Override
+        public CounterGroupKey getGroupKey() {
+            return groupKey;
+        }
+    }
+
+    private static class CounterGroupKeyImpl implements CounterGroupKey {
+        private final int index;
+        private final String name;
+        private final String description;
+        private CounterKey[] counterKeys;
+
+        public CounterGroupKeyImpl(int index, String name, String description) {
+            this.index = index;
+            this.name = name;
+            this.description = description;
+        }
+
+        public void setCounterKeys(CounterKey[] counterKeys) {
+            this.counterKeys = counterKeys;
+        }
+
+        @Override
+        public int getIndex() {
+            return index;
+        }
+        @Override
+        public String getName() {
+            return name;
+        }
+        @Override
+        public String getDescription() {
+            return description;
+        }
+        @Override
+        public int getCounterNumber() {
+            return counterKeys.length;
+        }
+        @Override
+        public List<CounterKey> listCounterKeys() {
+            return Arrays.asList(counterKeys);
+        }
+        @Override
+        public CounterKey getCounterKeyByName(String name) {
+            for (CounterKey counterKey : counterKeys) {
+                for (String n : counterKey.getNames()) {
+                    if (n.equalsIgnoreCase(name)) {
+                        return counterKey;
+                    }
+                }
+            }
+            return null;
+        }
+        @Override
+        public CounterKey getCounterKeyByID(int index) {
+            if (index < 0 || index >= counterKeys.length) {
+                return null;
+            }
+            return counterKeys[index];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
new file mode 100644
index 0000000..482623a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterGroupKey.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util.jobcounter;
+
+import java.util.List;
+
+public interface CounterGroupKey {
+
+    String getName();
+    String getDescription();
+    int getIndex();
+    int getCounterNumber();
+    List<CounterKey> listCounterKeys();
+    CounterKey getCounterKeyByName(String name);
+    CounterKey getCounterKeyByID(int index);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
new file mode 100644
index 0000000..8e4e519
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/CounterKey.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util.jobcounter;
+
+import java.util.List;
+
+public interface CounterKey {
+
+    List<String> getNames();
+    String getDescription();
+    int getIndex();
+    CounterGroupKey getGroupKey();
+    
+}