You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/09 05:25:35 UTC

[7/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr & spark running job monitoring

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
index 2684899..e0ec330 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -27,12 +27,12 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa")
 @ColumnFamily("f")
 @Prefix("jexec")
-@Service(JPAConstants.JPA_JOB_EXECUTION_SERVICE_NAME)
+@Service(Constants.JPA_JOB_EXECUTION_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 @Indexes({
-    @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true),
-    @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false)
+    @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
+    @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
     })
 public class JobExecutionAPIEntity extends JobBaseAPIEntity {
     @Column("a")
@@ -55,6 +55,22 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
     private int numFinishedReduces;
     @Column("j")
     private JobCounters jobCounters;
+    @Column("k")
+    private int dataLocalMaps;
+    @Column("l")
+    private double dataLocalMapsPercentage;
+    @Column("m")
+    private int rackLocalMaps;
+    @Column("n")
+    private double rackLocalMapsPercentage;
+    @Column("o")
+    private int totalLaunchedMaps;
+    @Column("p")
+    private long submissionTime;
+    @Column("q")
+    private long lastMapDuration;
+    @Column("r")
+    private long lastReduceDuration;
 
     public String getCurrentState() {
         return currentState;
@@ -129,4 +145,76 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
         this.jobCounters = jobCounters;
         _pcs.firePropertyChange("jobCounters", null, null);
     }
+
+    public int getDataLocalMaps() {
+        return dataLocalMaps;
+    }
+
+    public void setDataLocalMaps(int dataLocalMaps) {
+        this.dataLocalMaps = dataLocalMaps;
+        valueChanged("dataLocalMaps");
+    }
+
+    public double getDataLocalMapsPercentage() {
+        return dataLocalMapsPercentage;
+    }
+
+    public void setDataLocalMapsPercentage(double dataLocalMapsPercentage) {
+        this.dataLocalMapsPercentage = dataLocalMapsPercentage;
+        valueChanged("dataLocalMapsPercentage");
+    }
+
+    public int getRackLocalMaps() {
+        return rackLocalMaps;
+    }
+
+    public void setRackLocalMaps(int rackLocalMaps) {
+        this.rackLocalMaps = rackLocalMaps;
+        valueChanged("rackLocalMaps");
+    }
+
+    public double getRackLocalMapsPercentage() {
+        return rackLocalMapsPercentage;
+    }
+
+    public void setRackLocalMapsPercentage(double rackLocalMapsPercentage) {
+        this.rackLocalMapsPercentage = rackLocalMapsPercentage;
+        valueChanged("rackLocalMapsPercentage");
+    }
+
+    public int getTotalLaunchedMaps() {
+        return totalLaunchedMaps;
+    }
+
+    public void setTotalLaunchedMaps(int totalLaunchedMaps) {
+        this.totalLaunchedMaps = totalLaunchedMaps;
+        valueChanged("totalLaunchedMaps");
+    }
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        valueChanged("submissionTime");
+    }
+
+    public long getLastMapDuration() {
+        return lastMapDuration;
+    }
+
+    public void setLastMapDuration(long lastMapDuration) {
+        this.lastMapDuration = lastMapDuration;
+        valueChanged("lastMapDuration");
+    }
+
+    public long getLastReduceDuration() {
+        return lastReduceDuration;
+    }
+
+    public void setLastReduceDuration(long lastReduceDuration) {
+        this.lastReduceDuration = lastReduceDuration;
+        valueChanged("lastReduceDuration");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
index 2400c55..9e8a372 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa_process")
 @ColumnFamily("f")
 @Prefix("process")
-@Service(JPAConstants.JPA_JOB_PROCESS_TIME_STAMP_NAME)
+@Service(Constants.JPA_JOB_PROCESS_TIME_STAMP_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 public class JobProcessTimeStampEntity extends TaggedLogAPIEntity {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
index 9769620..929a98f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa_anomaly")
 @ColumnFamily("f")
 @Prefix("tacount")
-@Service(JPAConstants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME)
+@Service(Constants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
index 77994a5..abc28e2 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa_task")
 @ColumnFamily("f")
 @Prefix("taexec")
-@Service(JPAConstants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
+@Service(Constants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 @Indexes({

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
index f287688..c1f71b8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -27,7 +27,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa_task")
 @ColumnFamily("f")
 @Prefix("texec")
-@Service(JPAConstants.JPA_TASK_EXECUTION_SERVICE_NAME)
+@Service(Constants.JPA_TASK_EXECUTION_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 public class TaskExecutionAPIEntity extends JobBaseAPIEntity {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
index 5ae67c0..7456522 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa_anomaly")
 @ColumnFamily("f")
 @Prefix("taskfailurecount")
-@Service(JPAConstants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME)
+@Service(Constants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 public class TaskFailureCountAPIEntity extends JobBaseAPIEntity {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
deleted file mode 100644
index 1c1c759..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure
- * counters.
- *
- */
-public final class CounterGroupDictionary {
-
-    private final List<CounterGroupKey> groupKeys = new ArrayList<>();
-
-    private static volatile CounterGroupDictionary instance = null;
-    private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class);
-
-    private CounterGroupDictionary() {}
-
-    public static CounterGroupDictionary getInstance() throws JobCounterException {
-        if (instance == null) {
-            synchronized (CounterGroupDictionary.class) {
-                if (instance == null) {
-                    CounterGroupDictionary tmp = new CounterGroupDictionary();
-                    tmp.initialize();
-                    instance = tmp;
-                }
-            }
-        }
-        return instance;
-    }
-
-    public CounterGroupKey getCounterGroupByName(String groupName) {
-        for (CounterGroupKey groupKey : groupKeys) {
-            if (groupKey.getName().equalsIgnoreCase(groupName)) {
-                return groupKey;
-            }
-        }
-        return null;
-    }
-
-    public CounterGroupKey getCounterGroupByIndex(int groupIndex) {
-        if (groupIndex < 0 || groupIndex >= groupKeys.size()) {
-            return null;
-        }
-        return groupKeys.get(groupIndex);
-    }
-
-    private void initialize() throws JobCounterException {
-        // load config.properties file from classpath
-        InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf");
-        try {
-            if (is == null) {
-                is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf");
-                if (is == null) {
-                    final String errMsg = "Failed to load JobCounter.conf";
-                    LOG.error(errMsg);
-                    throw new JobCounterException(errMsg);
-                }
-            }
-            final Properties prop = new Properties();
-            try {
-                prop.load(is);
-            } catch(Exception ex) {
-                final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage();
-                LOG.error(errMsg, ex);
-                throw new JobCounterException(errMsg, ex);
-            }
-            int groupIndex = 0;
-            while (parseGroup(groupIndex, prop)) {
-                ++groupIndex;
-            }
-        } finally {
-            if (is != null) {
-                try {
-                    is.close();
-                } catch (IOException e) {
-                }
-            }
-        }
-    }
-
-    private boolean parseGroup(int groupIndex, Properties prop) {
-        final String groupKeyBase = "counter.group" + groupIndex;
-        final String groupNameKey = groupKeyBase + ".name";
-        final String groupName = prop.getProperty(groupNameKey);
-
-        if (groupName == null) {
-            return false;
-        }
-
-        final String groupDescriptionKey = groupKeyBase + ".description";
-        final String groupDescription = prop.getProperty(groupDescriptionKey);
-        final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription);
-        final ArrayList<CounterKey> counters = new ArrayList<CounterKey>();
-
-        int counterIndex = 0;
-        while (parseCounter(groupKey, counterIndex, counters, prop)) {
-            ++counterIndex;
-        }
-        groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()]));
-        groupKeys.add(groupKey);
-        return true;
-    }
-
-    private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List<CounterKey> counters, Properties prop) {
-        final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex;
-        final String counterNameKey = counterKeyBase + ".names";
-        final String counterNamesString = prop.getProperty(counterNameKey);
-
-        if (counterNamesString == null) {
-            return false;
-        }
-        final String[] names = counterNamesString.split(",");
-        final List<String> counterNames = new ArrayList<String>();
-        for (String name : names) {
-            counterNames.add(name.trim());
-        }
-
-        final String counterDescriptionKey = counterKeyBase + ".description";
-        final String counterDescription = prop.getProperty(counterDescriptionKey);
-
-        CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey);
-        counters.add(counter);
-        return true;
-    }
-
-    private static class CounterKeyImpl implements CounterKey {
-        private final int index;
-        private final List<String> counterNames;
-        private final String description;
-        private final CounterGroupKey groupKey;
-
-        public CounterKeyImpl(int index, List<String> counterNames, String description, CounterGroupKey groupKey) {
-            this.index = index;
-            this.counterNames = counterNames;
-            this.description = description;
-            this.groupKey = groupKey;
-        }
-        @Override
-        public int getIndex() {
-            return index;
-        }
-        @Override
-        public List<String> getNames() {
-            return counterNames;
-        }
-        @Override
-        public String getDescription() {
-            return description;
-        }
-        @Override
-        public CounterGroupKey getGroupKey() {
-            return groupKey;
-        }
-    }
-
-    private static class CounterGroupKeyImpl implements CounterGroupKey {
-        private final int index;
-        private final String name;
-        private final String description;
-        private CounterKey[] counterKeys;
-
-        public CounterGroupKeyImpl(int index, String name, String description) {
-            this.index = index;
-            this.name = name;
-            this.description = description;
-        }
-
-        public void setCounterKeys(CounterKey[] counterKeys) {
-            this.counterKeys = counterKeys;
-        }
-
-        @Override
-        public int getIndex() {
-            return index;
-        }
-        @Override
-        public String getName() {
-            return name;
-        }
-        @Override
-        public String getDescription() {
-            return description;
-        }
-        @Override
-        public int getCounterNumber() {
-            return counterKeys.length;
-        }
-        @Override
-        public List<CounterKey> listCounterKeys() {
-            return Arrays.asList(counterKeys);
-        }
-        @Override
-        public CounterKey getCounterKeyByName(String name) {
-            for (CounterKey counterKey : counterKeys) {
-                for (String n : counterKey.getNames()) {
-                    if (n.equalsIgnoreCase(name)) {
-                        return counterKey;
-                    }
-                }
-            }
-            return null;
-        }
-        @Override
-        public CounterKey getCounterKeyByID(int index) {
-            if (index < 0 || index >= counterKeys.length) {
-                return null;
-            }
-            return counterKeys[index];
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
deleted file mode 100644
index 82606d1..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import java.util.List;
-
-public interface CounterGroupKey {
-
-    String getName();
-    String getDescription();
-    int getIndex();
-    int getCounterNumber();
-    List<CounterKey> listCounterKeys();
-    CounterKey getCounterKeyByName(String name);
-    CounterKey getCounterKeyByID(int index);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
deleted file mode 100644
index 161490f..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import java.util.List;
-
-public interface CounterKey {
-
-    List<String> getNames();
-    String getDescription();
-    int getIndex();
-    CounterGroupKey getGroupKey();
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
deleted file mode 100644
index 5ffaf51..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-public class JobCounterException extends Exception {
-
-    /**
-     * 
-     */
-    private static final long serialVersionUID = -4525162176188266862L;
-
-    /**
-     * Default constructor of JobCounterException
-     */
-    public JobCounterException() {
-        super();
-    }
-
-    /**
-     * Constructor of JobCounterException
-     * 
-     * @param message error message
-     */
-    public JobCounterException(String message) {
-        super(message);
-    }
-
-    /**
-     * Constructor of JobCounterException
-     * 
-     * @param message error message
-     * @param cause the cause of the exception
-     * 
-     */
-    public JobCounterException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * Constructor of JobCounterException
-     * 
-     * @param cause the cause of the exception
-     */
-    public JobCounterException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
deleted file mode 100644
index 2806cf1..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.jobcounter;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public final class JobCounters {
-    
-    private Map<String, Map<String, Long>> counters = new TreeMap<>();
-
-    public Map<String, Map<String, Long>> getCounters() {
-        return counters;
-    }
-
-    public void setCounters(Map<String, Map<String, Long>> counters) {
-        this.counters = counters;
-    }
-    
-    public String toString(){
-        return counters.toString();
-    }
-
-    public void clear() {
-        for (Map.Entry<String, Map<String, Long>> entry : counters.entrySet()) {
-            entry.getValue().clear();
-        }
-        counters.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
deleted file mode 100644
index 9d13fb4..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.parser;
-
-public enum EagleJobTagName {
-    SITE("site"),
-    RACK("rack"),
-    HOSTNAME("hostname"),
-    JOB_NAME("jobName"),
-    NORM_JOB_NAME("normJobName"),
-    JOB_ID("jobID"),
-    TASK_ID("taskID"),
-    TASK_ATTEMPT_ID("taskAttemptID"),
-    JOB_STATUS("jobStatus"),
-    USER("user"),
-    TASK_TYPE("taskType"),
-    TASK_EXEC_TYPE("taskExecType"),
-    ERROR_CATEGORY("errorCategory"),
-    JOB_QUEUE("queue"),
-    RULE_TYPE("ruleType"),
-    JOB_TYPE("jobType");
-
-    private String tagName; 
-    private EagleJobTagName(String tagName) {
-        this.tagName = tagName;
-    }
-    
-    public String toString() {
-
-        return this.tagName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 31bfdb5..6442699 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,11 +18,13 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.jpm.mr.history.entities.JobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.entities.*;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.JobNameNormalization;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
@@ -56,10 +58,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     // hostname to rack mapping
     protected Map<String, String> m_host2RackMapping;
 
-    protected String m_jobID;
+    protected String m_jobId;
     protected String m_jobName;
     protected String m_jobType;
-    protected String m_normJobName;
+    protected String m_jobDefId;
     protected String m_user;
     protected String m_queueName;
     protected Long m_jobLauchTime;
@@ -69,12 +71,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
 
     protected final Configuration configuration;
 
-    public JPAConstants.JobType fetchJobType(Configuration config) {
-        if (config.get(JPAConstants.JobConfiguration.CASCADING_JOB) != null) { return JPAConstants.JobType.CASCADING; }
-        if (config.get(JPAConstants.JobConfiguration.HIVE_JOB) != null) { return JPAConstants.JobType.HIVE; }
-        if (config.get(JPAConstants.JobConfiguration.PIG_JOB) != null) { return JPAConstants.JobType.PIG; }
-        if (config.get(JPAConstants.JobConfiguration.SCOOBI_JOB) != null) {return JPAConstants.JobType.SCOOBI; }
-        return JPAConstants.JobType.NOTAVALIABLE;
+    public Constants.JobType fetchJobType(Configuration config) {
+        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; }
+        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { return Constants.JobType.HIVE; }
+        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { return Constants.JobType.PIG; }
+        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {return Constants.JobType.SCOOBI; }
+        return Constants.JobType.NOTAVALIABLE;
     }
 
     /**
@@ -121,7 +123,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     public void close() throws IOException {
         // check if this job history file is complete
         if (m_jobExecutionEntity.getEndTime() == 0L) {
-            throw new IOException(new JHFWriteNotCompletedException(m_jobID));
+            throw new IOException(new JHFWriteNotCompletedException(m_jobId));
         }
         try {
             flush();
@@ -142,7 +144,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
        * @param id
        */
     private void setJobID(String id) {
-        this.m_jobID = id;
+        this.m_jobId = id;
     }
 
     private void setJobType(String jobType) {
@@ -152,10 +154,10 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
        String id = values.get(Keys.JOBID);
 
-       if (m_jobID == null) {
+       if (m_jobId == null) {
            setJobID(id);
-       } else if (!m_jobID.equals(id)) {
-           String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobID + "'";
+       } else if (!m_jobId.equals(id)) {
+           String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'";
            LOG.error(msg);
            throw new ImportException(msg);
        }
@@ -165,51 +167,63 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
            m_user = values.get(Keys.USER);
            m_queueName = values.get(Keys.JOB_QUEUE);
            m_jobName = values.get(Keys.JOBNAME);
-           m_normJobName = m_jobName;
+           m_jobDefId = m_jobName;
 
-           LOG.info("NormJobName of " + id + ": " + m_normJobName);
+           // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate.
+           String jobDefId = null;
+           if(configuration != null ) jobDefId = configuration.get(Constants.JOB_DEFINITION_ID_KEY);
 
-           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
-           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
-           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
-           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           if(jobDefId == null) {
+               m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName);
+           } else {
+               LOG.debug("Got normJobName from job configuration for " + id + ": " + jobDefId);
+               m_jobDefId = jobDefId;
+           }
+
+           LOG.info("NormJobName of " + id + ": " + m_jobDefId);
+
+           m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
+           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
            entityCreated(m_jobSubmitEventEntity);
        } else if(values.get(Keys.LAUNCH_TIME) != null) {  // job launched
            m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
            m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
-           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
-           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
-           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
-           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
-           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
+           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
            m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
            m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
            entityCreated(m_jobLaunchEventEntity);
        } else if(values.get(Keys.FINISH_TIME) != null) {  // job finished
            m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
-           m_jobFinishEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
-           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
-           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
-           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobFinishEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
-           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
+           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
            entityCreated(m_jobFinishEventEntity);
 
            // populate jobExecutionEntity entity
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_QUEUE.toString(), m_queueName);
-           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName);
+           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
 
            m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
            m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
            m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
            m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+           m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
            if (values.get(Keys.FAILED_MAPS) != null) {
                // for Artemis
                m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
@@ -223,7 +237,27 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
            m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
            m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
            if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
-               m_jobExecutionEntity.setJobCounters(parseCounters(totalCounters));
+               JobCounters jobCounters = parseCounters(totalCounters);
+               m_jobExecutionEntity.setJobCounters(jobCounters);
+               if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
+                   Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER);
+                   if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
+                       m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
+                   }
+
+                   if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
+                       m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
+                   }
+
+                   if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
+                       m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
+                   }
+               }
+
+               if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
+                   m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+                   m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+               }
            }
            entityCreated(m_jobExecutionEntity);
        }
@@ -261,13 +295,13 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         final String taskID = values.get(Keys.TASKID);
 
         Map<String, String> taskBaseTags = new HashMap<String, String>(){{
-            put(EagleJobTagName.TASK_TYPE.toString(), taskType);
-            put(EagleJobTagName.USER.toString(), m_user);
-            //put(EagleJobTagName.JOB_NAME.toString(), _jobName);
-            put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
-            put(EagleJobTagName.JOB_TYPE.toString(), m_jobType);
-            put(EagleJobTagName.JOB_ID.toString(), m_jobID);
-            put(EagleJobTagName.TASK_ID.toString(), taskID);
+            put(MRJobTagName.TASK_TYPE.toString(), taskType);
+            put(MRJobTagName.USER.toString(), m_user);
+            //put(MRJobTagName.JOB_NAME.toString(), _jobName);
+            put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+            put(MRJobTagName.JOB_TYPE.toString(), m_jobType);
+            put(MRJobTagName.JOB_ID.toString(), m_jobId);
+            put(MRJobTagName.TASK_ID.toString(), taskID);
         }};
         taskBaseTags.putAll(m_baseTags);
         if (recType == RecordTypes.Task && startTime != null) { // task start, no host is assigned yet
@@ -278,8 +312,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             Map<String, String> taskExecutionTags = new HashMap<>(taskBaseTags);
             String hostname = m_taskRunningHosts.get(taskID);
             hostname = (hostname == null) ? "" : hostname; // TODO if task fails, then no hostname
-            taskExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname);
-            taskExecutionTags.put(EagleJobTagName.RACK.toString(), m_host2RackMapping.get(hostname));
+            taskExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
+            taskExecutionTags.put(MRJobTagName.RACK.toString(), m_host2RackMapping.get(hostname));
             entity.setTags(taskExecutionTags);
             entity.setStartTime(m_taskStartTime.get(taskID));
             entity.setEndTime(Long.valueOf(finishTime));
@@ -290,6 +324,13 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             if (values.get(Keys.COUNTERS) != null || counters != null) {
                 entity.setJobCounters(parseCounters(counters));
             }
+            long duration = entity.getEndTime() - m_jobSubmitEventEntity.getTimestamp();
+            if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > m_jobExecutionEntity.getLastMapDuration()) {
+                m_jobExecutionEntity.setLastMapDuration(duration);
+            }
+            if (taskType.equals(Constants.TaskType.REDUCE.toString()) && duration > m_jobExecutionEntity.getLastReduceDuration()) {
+                m_jobExecutionEntity.setLastReduceDuration(duration);
+            }
             entityCreated(entity);
             //_taskStartTime.remove(taskID); // clean this taskID
         } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start
@@ -300,8 +341,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             entity.setTags(taskAttemptExecutionTags);
             String hostname = values.get(Keys.HOSTNAME);
             String rack = values.get(Keys.RACK);
-            taskAttemptExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname);
-            taskAttemptExecutionTags.put(EagleJobTagName.RACK.toString(), rack);
+            taskAttemptExecutionTags.put(MRJobTagName.HOSTNAME.toString(), hostname);
+            taskAttemptExecutionTags.put(MRJobTagName.RACK.toString(), rack);
             // put last attempt's hostname to task level
             m_taskRunningHosts.put(taskID, hostname);
             // it is very likely that an attempt ID could be both succeeded and failed due to M/R system
@@ -345,17 +386,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         if (matchMustHaveKeyPatterns(prop)) {
             JobConfigurationAPIEntity jobConfigurationEntity = new JobConfigurationAPIEntity();
             jobConfigurationEntity.setTags(new HashMap<>(m_baseTags));
-            jobConfigurationEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
-            jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
-            jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
-            jobConfigurationEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
-            jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(), m_jobType);
+            jobConfigurationEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+            jobConfigurationEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+            jobConfigurationEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+            jobConfigurationEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+            jobConfigurationEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), m_jobType);
             jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
 
             JobConfig jobConfig = new JobConfig();
             jobConfig.setConfig(prop);
             jobConfigurationEntity.setJobConfig(jobConfig);
-            jobConfigurationEntity.setConfigJobName(m_normJobName);
+            jobConfigurationEntity.setConfigJobName(m_jobDefId);
             entityCreated(jobConfigurationEntity);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index 7fe8909..278deca 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 6fd2c18..2d960b0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -19,15 +19,12 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.jobhistory.*;
-import org.apache.hadoop.util.StringInterner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index e485cc8..38ca35c 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -20,11 +20,11 @@ package org.apache.eagle.jpm.mr.history.parser;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
 import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,16 +83,16 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
             Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
             if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>();
-            mapTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
-            counters.put(JPAConstants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
+            mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
+            counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
 
             Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
             if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>();
-            reduceTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
-            counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
+            reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
 
-            counters.put(JPAConstants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
-            counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
+            counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
 
             jobCounters.setCounters(counters);
 
@@ -105,18 +105,18 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
     private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity entity) {
         JobCounters jobCounters = entity.getJobCounters();
-        String taskType = entity.getTags().get(JPAConstants.JOB_TASK_TYPE_TAG);
+        String taskType = entity.getTags().get(Constants.JOB_TASK_TYPE_TAG);
 
         if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) {
-            if (JPAConstants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
+            if (Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
                 m_mapAttemptDuration += entity.getDuration();
-                this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER));
-                this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER));
+                this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+                this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
                 return;
-            } else if (JPAConstants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
+            } else if (Constants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
                 m_reduceAttemptDuration += entity.getDuration();
-                this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER));
-                this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER));
+                this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+                this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
                 return;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 23e7072..94de068 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
 import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.history.entities.TaskAttemptCounterAPIEntity;
 import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
@@ -74,12 +75,12 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
         TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
         
         Map<String, String> tags = new HashMap<>();
-        tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString()));
-        tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString()));
-        tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString()));
-        tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString()));
-        tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString()));
-        tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString()));
+        tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+        tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+        tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+        tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+        tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+        tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
 
         CounterKey key = new CounterKey();
         key.tags = tags;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 14cc882..177fdc1 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
 import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
 import org.apache.eagle.jpm.mr.history.entities.TaskFailureCountAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
@@ -80,17 +81,17 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
     	TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
     	Map<String, String> tags = new HashMap<>();
     	failureTask.setTags(tags);
-    	tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString()));
-    	tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString()));
-    	tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString()));
-    	tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString()));
-    	tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString()));
-    	tags.put(EagleJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
-    	tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString()));
+    	tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+    	tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+    	tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+    	tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+    	tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+    	tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+    	tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
 
     	//TODO need optimize, match and then capture the data
     	final String errCategory = classifier.classifyError(e.getError());
-    	tags.put(EagleJobTagName.ERROR_CATEGORY.toString(), errCategory);
+    	tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
 
     	failureTask.setError(e.getError());
     	failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
index db62cfb..3a08c52 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
@@ -110,6 +110,9 @@ counter.group2.counter10.names = SLOTS_MILLIS_MAPS
 counter.group2.counter10.description = Total vcore-seconds taken by all reduce tasks
 counter.group2.counter11.names = SLOTS_MILLIS_REDUCES
 counter.group2.counter11.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter12.names = RACK_LOCAL_MAPS
+counter.group2.counter12.description = Total vcore-seconds taken by all reduce tasks
+
 
 counter.group3.name = MapTaskAttemptCounter
 counter.group3.description = Reduce Task Attempt Counter Aggregation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 8cb1aa3..1b97271 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -62,7 +62,7 @@
       "password": "secret"
     }
   },
-  
+
   "MRConfigureKeys" : [
     "mapreduce.map.output.compress",
     "mapreduce.map.output.compress.codec",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
new file mode 100644
index 0000000..3c8aa92
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-jpm-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-jpm-mr-running</artifactId>
+    <name>eagle-jpm-mr-running</name>
+    <url>http://maven.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-process-api</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.wso2.orbit.com.lmax</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-process-base</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.wso2.orbit.com.lmax</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-job-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jsoup</groupId>
+            <artifactId>jsoup</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/assembly/eagle-jpm-mr-running-assembly.xml</descriptor>
+                    <finalName>eagle-jpm-mr-running-${project.version}</finalName>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml b/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml
new file mode 100644
index 0000000..66133a0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/assembly/eagle-jpm-mr-running-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>assembly</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>false</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <!--includes>
+                <include>org.apache.hadoop:hadoop-common</include>
+                <include>org.apache.hadoop:hadoop-hdfs</include>
+                <include>org.apache.hadoop:hadoop-client</include>
+                <include>org.apache.hadoop:hadoop-auth</include>
+                <include>org.apache.eagle:eagle-stream-process-api</include>
+                <include>org.apache.eagle:eagle-stream-process-base</include>
+                <include>org.jsoup:jsoup</include>
+            </includes-->
+            <excludes>
+                <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+                <exclude>asm:asm</exclude>
+                <exclude>org.apache.storm:storm-core</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.outputDirectory}/</directory>
+            <outputDirectory>/</outputDirectory>
+            <!--<includes>-->
+            <!--<include>*.conf</include>-->
+            <!--<include>*.xml</include>-->
+            <!--<include>*.properties</include>-->
+            <!--<include>*.config</include>-->
+            <!--<include>classes/META-INF/*</include>-->
+            <!--</includes>-->
+
+            <excludes>
+                <exclude>*.yaml</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
new file mode 100644
index 0000000..fb8b805
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class MRRunningJobMain {
+    public static void main(String[] args) {
+
+        try {
+            //1. trigger init conf
+            MRRunningConfigManager mrRunningConfigManager = MRRunningConfigManager.getInstance(args);
+
+            List<String> confKeyKeys = mrRunningConfigManager.getConfig().getStringList("MRConfigureKeys.jobConfigKey");
+            confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
+            confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
+            confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
+            confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
+
+            //2. init topology
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            String topologyName = mrRunningConfigManager.getConfig().getString("envContextConfig.topologyName");
+            String spoutName = "mrRunningJobFetchSpout";
+            String boltName = "mrRunningJobParseBolt";
+            int parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+            int tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+            if (parallelism > tasks) {
+                parallelism = tasks;
+            }
+            topologyBuilder.setSpout(
+                    spoutName,
+                    new MRRunningJobFetchSpout(
+                            mrRunningConfigManager.getJobExtractorConfig(),
+                            mrRunningConfigManager.getEndpointConfig(),
+                            mrRunningConfigManager.getZkStateConfig()),
+                    parallelism
+            ).setNumTasks(tasks);
+
+            parallelism = mrRunningConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + boltName);
+            tasks = mrRunningConfigManager.getConfig().getInt("envContextConfig.tasks." + boltName);
+            if (parallelism > tasks) {
+                parallelism = tasks;
+            }
+            topologyBuilder.setBolt(boltName,
+                    new MRRunningJobParseBolt(
+                            mrRunningConfigManager.getEagleServiceConfig(),
+                            mrRunningConfigManager.getEndpointConfig(),
+                            mrRunningConfigManager.getJobExtractorConfig(),
+                            mrRunningConfigManager.getZkStateConfig(),
+                            confKeyKeys),
+                    parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+
+            backtype.storm.Config config = new backtype.storm.Config();
+            config.setNumWorkers(mrRunningConfigManager.getConfig().getInt("envContextConfig.workers"));
+            config.put(Config.TOPOLOGY_DEBUG, true);
+            if (!mrRunningConfigManager.getEnv().equals("local")) {
+                //cluster mode
+                //parse conf here
+                StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
+            } else {
+                //local mode
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
new file mode 100644
index 0000000..05e7812
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.config;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+
+import java.io.Serializable;
+
+public class MRRunningConfigManager implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class);
+    public String getEnv() {
+        return env;
+    }
+    private String env;
+
+    public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+    private ZKStateConfig zkStateConfig;
+
+    public EagleServiceConfig getEagleServiceConfig() {
+        return eagleServiceConfig;
+    }
+    private EagleServiceConfig eagleServiceConfig;
+
+    public JobExtractorConfig getJobExtractorConfig() {
+        return jobExtractorConfig;
+    }
+    private JobExtractorConfig jobExtractorConfig;
+
+    public EndpointConfig getEndpointConfig() {
+        return endpointConfig;
+    }
+    private EndpointConfig endpointConfig;
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+        public String zkPort;
+    }
+
+    public static class EagleServiceConfig implements Serializable {
+        public String eagleServiceHost;
+        public int eagleServicePort;
+        public int readTimeoutSeconds;
+        public int maxFlushNum;
+        public String username;
+        public String password;
+    }
+
+    public static class JobExtractorConfig implements Serializable {
+        public String site;
+        public int fetchRunningJobInterval;
+        public int parseJobThreadPoolSize;
+        public int topAndBottomTaskByElapsedTime;
+    }
+
+    public static class EndpointConfig implements Serializable {
+        public String[] rmUrls;
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+    private Config config;
+
+    private static MRRunningConfigManager manager = new MRRunningConfigManager();
+
+    private MRRunningConfigManager() {
+        this.eagleServiceConfig = new EagleServiceConfig();
+        this.jobExtractorConfig = new JobExtractorConfig();
+        this.endpointConfig = new EndpointConfig();
+        this.zkStateConfig = new ZKStateConfig();
+    }
+
+    public static MRRunningConfigManager getInstance(String[] args) {
+        manager.init(args);
+        return manager;
+    }
+
+    private void init(String[] args) {
+        try {
+            LOG.info("Loading from configuration file");
+            this.config = new ConfigOptionParser().load(args);
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+        }
+
+        this.env = config.getString("envContextConfig.env");
+
+        //parse eagle zk
+        this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeperConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = config.getInt("zookeeperConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = config.getInt("zookeeperConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+
+        // parse eagle service endpoint
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
+        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleProps.eagleService.readTimeOutSeconds");
+        this.eagleServiceConfig.maxFlushNum = config.getInt("eagleProps.eagleService.maxFlushNum");
+        //parse job extractor
+        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.fetchRunningJobInterval = config.getInt("jobExtractorConfig.fetchRunningJobInterval");
+        this.jobExtractorConfig.parseJobThreadPoolSize = config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
+        this.jobExtractorConfig.topAndBottomTaskByElapsedTime = config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
+
+        //parse data source config
+        this.endpointConfig.rmUrls = config.getStringList("dataSourceConfig.rmUrls").toArray(new String[0]);
+
+        LOG.info("Successfully initialized MRRunningConfigManager");
+        LOG.info("env: " + this.env);
+        LOG.info("site: " + this.jobExtractorConfig.site);
+        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+    }
+}