You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/05 18:07:43 UTC

[4/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
new file mode 100644
index 0000000..451b921
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public class DefaultJobIdPartitioner implements JobIdPartitioner {
+    @Override
+    public int partition(int numTotalParts, String jobId) {
+        int hash = jobId.hashCode();
+        hash = Math.abs(hash);
+        return hash % numTotalParts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
new file mode 100644
index 0000000..30374c4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+import java.util.*;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HistoryJobProgressBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(HistoryJobProgressBolt.class);
+
+    private final static int MAX_RETRY_TIMES = 3;
+    private Long m_minTimeStamp;
+    private int m_numTotalPartitions;
+    private JHFConfigManager configManager;
+    private Map<Integer, Long> m_partitionTimeStamp = new TreeMap<>();
+    public HistoryJobProgressBolt(String parentName, JHFConfigManager configManager) {
+        this.configManager = configManager;
+        m_numTotalPartitions = this.configManager.getConfig().getInt("envContextConfig.parallelismConfig." + parentName);
+        m_minTimeStamp = 0L;
+    }
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        Integer partitionId = tuple.getIntegerByField("partitionId");
+        Long timeStamp = tuple.getLongByField("timeStamp");
+        LOG.info("partition " + partitionId + ", timeStamp " + timeStamp);
+        if (!m_partitionTimeStamp.containsKey(partitionId) || (m_partitionTimeStamp.containsKey(partitionId) && m_partitionTimeStamp.get(partitionId) < timeStamp)) {
+            m_partitionTimeStamp.put(partitionId, timeStamp);
+        }
+
+        if (m_partitionTimeStamp.size() >= m_numTotalPartitions) {
+            //get min timestamp
+            Long minTimeStamp = Collections.min(m_partitionTimeStamp.values());
+
+            if (m_minTimeStamp == 0L) {
+                m_minTimeStamp = minTimeStamp;
+            }
+
+            if (m_minTimeStamp > minTimeStamp) {
+                //no need to update
+                return;
+            }
+
+            m_minTimeStamp = minTimeStamp;
+            final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+            final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+            Map<String, String> baseTags = new HashMap<String, String>() { {
+                put("site", jobExtractorConfig.site);
+            } };
+            JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
+            entity.setCurrentTimeStamp(m_minTimeStamp);
+            entity.setTimestamp(m_minTimeStamp);
+            entity.setTags(baseTags);
+
+            IEagleServiceClient client = new EagleServiceClientImpl(
+                    eagleServiceConfig.eagleServiceHost,
+                    eagleServiceConfig.eagleServicePort,
+                    eagleServiceConfig.username,
+                    eagleServiceConfig.password);
+
+            client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+
+            List<JobProcessTimeStampEntity> entities = new ArrayList<>();
+            entities.add(entity);
+
+            int tried = 0;
+            while (tried <= MAX_RETRY_TIMES) {
+                try {
+                    LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size());
+                    client.create(entities);
+                    LOG.info("finish flushing entities of total number " + entities.size());
+                    break;
+                } catch (Exception ex) {
+                    if (tried < MAX_RETRY_TIMES) {
+                        LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+                    } else {
+                        LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+                    }
+                }
+                tried ++;
+            }
+
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.error("failed to close eagle service client ", e);
+            }
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+    @Override
+    public void cleanup() {
+        super.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
new file mode 100644
index 0000000..a10599b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.crawler.*;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Zookeeper znode structure
+ * -zkRoot
+ *   - partitions
+ *      - 0 (20150101)
+ *      - 1 (20150101)
+ *      - 2 (20150101)
+ *      - ... ...
+ *      - N-1 (20150102)
+ *   - jobs
+ *      - 20150101
+ *        - job1
+ *        - job2
+ *        - job3
+ *      - 20150102
+ *        - job1
+ *        - job2
+ *        - job3
+ *
+ * Spout can have multiple instances, which is supported by storm parallelism primitive.
+ *
+ * Under znode partitions, N child znodes (name is 0 based integer) would be created with each znode mapped to one spout instance. All jobs will be partitioned into N
+ * partitions by applying JobPartitioner class to each job Id. The value of each partition znode is the date when the last job in this partition
+ * is successfully processed.
+ *
+ * processing steps
+ * 1) In constructor,
+ * 2) In open(), calculate jobPartitionId for current spout (which should be exactly same to spout taskId within TopologyContext)
+ * 3) In open(), zkState.ensureJobPartitions to rebuild znode partitions if necessary. ensureJobPartitions is only done by one spout task as internally this is using lock
+ * 5) In nextTuple(), list job files by invoking hadoop API
+ * 6) In nextTuple(), iterate each jobId and invoke JobPartition.partition(jobId) and keep those jobs belonging to current partition Id
+ * 7) process job files (job history file and job configuration xml file)
+ * 8) add job Id to current date slot say for example 20150102 after this job is successfully processed
+ * 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should be configurable)
+ *
+ * Note:
+ * if one spout instance crashes and is brought up again, open() method would be invoked again, we need think of this scenario.
+ *
+ */
+
+public class JobHistorySpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(JobHistorySpout.class);
+
+    private int partitionId;
+    private int numTotalPartitions;
+    private transient JobHistoryZKStateManager zkState;
+    private transient JHFCrawlerDriver driver;
+    private JobHistoryContentFilter contentFilter;
+    private JobHistorySpoutCollectorInterceptor interceptor;
+    private JHFInputStreamCallback callback;
+    private JHFConfigManager configManager;
+    private JobHistoryLCM m_jhfLCM;
+
+    public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) {
+        this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
+    }
+
+    /**
+     * mostly this constructor signature is for unit test purpose as you can put customized interceptor here
+     * @param filter
+     * @param adaptor
+     */
+    public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager, JobHistorySpoutCollectorInterceptor adaptor) {
+        this.contentFilter = filter;
+        this.configManager = configManager;
+        this.interceptor = adaptor;
+        callback = new DefaultJHFInputStreamCallback(contentFilter, configManager, interceptor);
+    }
+
+    private int calculatePartitionId(TopologyContext context) {
+        int thisGlobalTaskId = context.getThisTaskId();
+        String componentName = context.getComponentId(thisGlobalTaskId);
+        List<Integer> globalTaskIds = context.getComponentTasks(componentName);
+        numTotalPartitions = globalTaskIds.size();
+        int index = 0;
+        for (Integer id : globalTaskIds) {
+            if (id == thisGlobalTaskId) {
+                return index;
+            }
+            index++;
+        }
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context,
+                     final SpoutOutputCollector collector) {
+        partitionId = calculatePartitionId(context);
+        // sanity verify 0<=partitionId<=numTotalPartitions-1
+        if (partitionId < 0 || partitionId > numTotalPartitions) {
+            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
+                    partitionId + " and numTotalPartitions " + numTotalPartitions);
+        }
+        Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls;
+        JobIdPartitioner partitioner;
+        try {
+            partitioner = partitionerCls.newInstance();
+        } catch (Exception e) {
+            LOG.error("failing instantiating job partitioner class " + partitionerCls,e);
+            throw new IllegalStateException(e);
+        }
+        JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
+        zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
+        zkState.ensureJobPartitions(numTotalPartitions);
+        interceptor.setSpoutOutputCollector(collector);
+
+        try {
+            m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
+            driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(),
+                    configManager.getControlConfig(),
+                    callback,
+                    zkState,
+                    m_jhfLCM,
+                    jobIdFilter,
+                    partitionId);
+        } catch (Exception e) {
+            LOG.error("failing creating crawler driver");
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        try {
+            Long modifiedTime = driver.crawl();
+            interceptor.collect(new ValuesArray(partitionId, modifiedTime));
+        } catch (Exception ex) {
+            LOG.error("fail crawling job history file and continue ...", ex);
+            try {
+                m_jhfLCM.freshFileSystem();
+            } catch (Exception e) {
+                LOG.error("failed to fresh file system ", e);
+            }
+        } finally {
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+
+            }
+        }
+    }
+
+    /**
+     * empty because framework will take care of output fields declaration
+     */
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("partitionId", "timeStamp"));
+    }
+
+    /**
+     * add to processedJob
+     */
+    @Override
+    public void ack(Object jobId) {
+    }
+
+    /**
+     * job is not fully processed
+     */
+    @Override
+    public void fail(Object jobId) {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
new file mode 100644
index 0000000..b58c84f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public interface JobIdFilter {
+    boolean accept(String jobId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
new file mode 100644
index 0000000..07b8519
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public class JobIdFilterByPartition implements JobIdFilter {
+    private JobIdPartitioner partitioner;
+    private int numTotalPartitions;
+    private int partitionId;
+
+    public JobIdFilterByPartition(JobIdPartitioner partitioner, int numTotalPartitions, int partitionId) {
+        this.partitioner = partitioner;
+        this.numTotalPartitions = numTotalPartitions;
+        this.partitionId = partitionId;
+    }
+
+    @Override
+    public boolean accept(String jobId) {
+        int part = partitioner.partition(numTotalPartitions, jobId);
+        if (part == partitionId) {
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
new file mode 100644
index 0000000..cc7e68c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public interface JobIdPartitioner {
+    int partition(int numTotalParts, String jobId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
new file mode 100644
index 0000000..308057b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.zkres;
+
+import java.util.List;
+
+public interface JobHistoryZKStateLCM {
+    void ensureJobPartitions(int numTotalPartitions);
+    String readProcessedDate(int partitionId);
+    List<String> readProcessedJobs(String date);
+    void updateProcessedDate(int partitionId, String date);
+    void addProcessedJob(String date, String jobId);
+    void truncateProcessedJob(String date);
+    void truncateEverything();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
new file mode 100644
index 0000000..24dd7be
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.zkres;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.List;
+
+public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
+    public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+    private String zkRoot;
+    private CuratorFramework _curator;
+    public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions";
+    public static final String ZNODE_FORCE_START_FROM = "forceStartFrom";
+    public static final String ZNODE_PARTITIONS = "partitions";
+
+    public static final int BACKOFF_DAYS = 0;
+
+    private CuratorFramework newCurator(ZKStateConfig config) throws Exception {
+        return CuratorFrameworkFactory.newClient(
+                config.zkQuorum,
+                config.zkSessionTimeoutMs,
+                15000,
+                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
+        );
+    }
+
+    public JobHistoryZKStateManager(ZKStateConfig config) {
+        this.zkRoot = config.zkRoot;
+
+        try {
+            _curator = newCurator(config);
+            _curator.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close() {
+        _curator.close();
+        _curator = null;
+    }
+
+    private String readForceStartFrom() {
+        String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                return new String(_curator.getData().forPath(path), "UTF-8");
+            }
+        } catch (Exception ex) {
+            LOG.error("fail reading forceStartFrom znode", ex);
+        }
+        return null;
+    }
+
+    private void deleteForceStartFrom() {
+        String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                _curator.delete().forPath(path);
+            }
+        } catch(Exception ex) {
+            LOG.error("fail reading forceStartFrom znode", ex);
+        }
+    }
+
+    private String getProcessedDateAfterBackoff(int backOffDays) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
+        Calendar c = Calendar.getInstance();
+        c.add(Calendar.DATE, -1);
+        c.add(Calendar.DATE, -1 * backOffDays);
+        return sdf.format(c.getTime());
+    }
+
+    /**
+     * under zkRoot, znode forceStartFrom is used to force job is crawled from that date
+     * IF
+     *    forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
+     * THEN
+     *    rebuild all partitions with the forceStartFrom
+     * ELSE
+     *    IF
+     *       partition structure is changed
+     *    THEN
+     *       IF
+     *          there is valid mindate for existing partitions
+     *       THEN
+     *          rebuild job partitions from that valid mindate
+     *       ELSE
+     *          rebuild job partitions from (today - BACKOFF_DAYS)
+     *       END
+     *    ELSE
+     *      do nothing
+     *    END
+     * END
+     *
+     *
+     * forceStartFrom is deleted once its value is used, so next time when topology is restarted, program can run from where topology is stopped last time
+     */
+    @Override
+    public void ensureJobPartitions(int numTotalPartitions) {
+        // lock before rebuild job partitions
+        String lockForEnsureJobPartitions = zkRoot + "/" + ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS;
+        InterProcessMutex lock = new InterProcessMutex(_curator, lockForEnsureJobPartitions);
+        String path = zkRoot + "/" + ZNODE_PARTITIONS;
+        try {
+            lock.acquire();
+            int minDate = 0;
+            String forceStartFrom = readForceStartFrom();
+            if (forceStartFrom != null) {
+                try {
+                    minDate = Integer.valueOf(forceStartFrom);
+                } catch(Exception ex) {
+                    LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom);
+                    throw new IllegalStateException();
+                }
+            } else {
+                boolean pathExists = _curator.checkExists().forPath(path) == null ? false : true;
+                boolean structureChanged = true;
+                if (pathExists) {
+                    int currentCount = _curator.getChildren().forPath(path).size();
+                    if (numTotalPartitions == currentCount) {
+                        structureChanged = false;
+                        LOG.info("znode partitions structure is unchanged");
+                    } else {
+                        LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions);
+                    }
+                }
+                if (!structureChanged)
+                    return; // do nothing
+
+                if (pathExists) {
+                    List<String> partitions = _curator.getChildren().forPath(path);
+                    for (String partition : partitions) {
+                        String date = new String(_curator.getData().forPath(path + "/" + partition), "UTF-8");
+                        int tmp = Integer.valueOf(date);
+                        if(tmp < minDate)
+                            minDate = tmp;
+                    }
+                }
+
+                if (minDate == 0) {
+                    minDate = Integer.valueOf(getProcessedDateAfterBackoff(BACKOFF_DAYS));
+                }
+            }
+            rebuildJobPartitions(numTotalPartitions, String.valueOf(minDate));
+            deleteForceStartFrom();
+        } catch (Exception e) {
+            LOG.error("fail building job partitions", e);
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                lock.release();
+            } catch(Exception e) {
+                LOG.error("fail releasing lock", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void rebuildJobPartitions(int numTotalPartitions, String startingDate) throws Exception {
+        LOG.info("rebuild job partitions with numTotalPartitions " + numTotalPartitions + " with starting date " + startingDate);
+        String path = zkRoot + "/" + ZNODE_PARTITIONS;
+        // truncate all existing partitions
+        if (_curator.checkExists().forPath(path) != null) {
+            _curator.delete().deletingChildrenIfNeeded().forPath(path);
+        }
+
+        for (int i = 0; i < numTotalPartitions; i++) {
+            _curator.create()
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
+        }
+    }
+
+    @Override
+    public String readProcessedDate(int partitionId) {
+        String path = zkRoot + "/partitions/" + partitionId;
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                return new String(_curator.getData().forPath(path), "UTF-8");
+            } else {
+                return null;
+            }
+        } catch (Exception e) {
+            LOG.error("fail read processed date", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void updateProcessedDate(int partitionId, String date) {
+        String path = zkRoot + "/partitions/" + partitionId;
+        try {
+            if (_curator.checkExists().forPath(path) == null) {
+                _curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, date.getBytes("UTF-8"));
+            } else {
+                _curator.setData().forPath(path, date.getBytes("UTF-8"));
+            }
+        } catch (Exception e) {
+            LOG.error("fail update processed date", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void addProcessedJob(String date, String jobId) {
+        String path = zkRoot + "/jobs/" + date + "/" + jobId;
+        try {
+            if (_curator.checkExists().forPath(path) == null) {
+                _curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path);
+            } else {
+                _curator.setData().forPath(path);
+            }
+        } catch (Exception e) {
+            LOG.error("fail adding processed jobs", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void truncateProcessedJob(String date) {
+        LOG.info("trying to truncate all data for day " + date);
+        // we need lock before we do truncate
+        String path = zkRoot + "/jobs/" + date;
+        InterProcessMutex lock = new InterProcessMutex(_curator, path);
+        try {
+            lock.acquire();
+            if (_curator.checkExists().forPath(path) != null) {
+                _curator.delete().deletingChildrenIfNeeded().forPath(path);
+                LOG.info("really truncated all data for day " + date);
+            }
+        } catch (Exception e) {
+            LOG.error("fail truncating processed jobs", e);
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                lock.release();
+            } catch (Exception e) {
+                LOG.error("fail releasing lock", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public List<String> readProcessedJobs(String date) {
+        String path = zkRoot + "/jobs/" + date;
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                return _curator.getChildren().forPath(path);
+            } else {
+                return null;
+            }
+        } catch (Exception e) {
+            LOG.error("fail read processed jobs", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void truncateEverything() {
+        String path = zkRoot;
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                _curator.delete().deletingChildrenIfNeeded().forPath(path);
+            }
+        } catch (Exception ex) {
+            LOG.error("fail truncating verything", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
new file mode 100644
index 0000000..db62cfb
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#### Sample configuration:
+## counter.group0.name = groupname1
+## counter.group0.counter0.names = counterName1,counterName2,...
+## counter.group0.counter0.description = counter description...
+
+counter.group0.name = org.apache.hadoop.mapreduce.FileSystemCounter
+counter.group0.description = File System Counters
+counter.group0.counter0.names = FILE_BYTES_READ
+counter.group0.counter0.description = FILE: Number of bytes read
+counter.group0.counter1.names = FILE_BYTES_WRITTEN
+counter.group0.counter1.description = FILE: Number of bytes written
+counter.group0.counter2.names = FILE_READ_OPS
+counter.group0.counter2.description = FILE: Number of read operations
+counter.group0.counter3.names = FILE_LARGE_READ_OPS
+counter.group0.counter3.description = FILE: Number of large read operations
+counter.group0.counter4.names = FILE_WRITE_OPS
+counter.group0.counter4.description = FILE: Number of write operations
+counter.group0.counter5.names = HDFS_BYTES_READ
+counter.group0.counter5.description = HDFS: Number of bytes read
+counter.group0.counter6.names = HDFS_BYTES_WRITTEN
+counter.group0.counter6.description = HDFS: Number of bytes written
+counter.group0.counter7.names = HDFS_READ_OPS
+counter.group0.counter7.description = HDFS: Number of read operations
+counter.group0.counter8.names = HDFS_LARGE_READ_OPS
+counter.group0.counter8.description = HDFS: Number of large read operations
+counter.group0.counter9.names = HDFS_WRITE_OPS
+counter.group0.counter9.description = HDFS: Number of write operations
+
+counter.group1.name = org.apache.hadoop.mapreduce.TaskCounter
+counter.group1.description = Map-Reduce Framework
+counter.group1.counter0.names = MAP_INPUT_RECORDS
+counter.group1.counter0.description = Map input records
+counter.group1.counter1.names = MAP_OUTPUT_RECORDS
+counter.group1.counter1.description = Map output records
+counter.group1.counter2.names = SPLIT_RAW_BYTES
+counter.group1.counter2.description = Input split bytes
+counter.group1.counter3.names = SPILLED_RECORDS
+counter.group1.counter3.description = Spilled Records
+counter.group1.counter4.names = CPU_MILLISECONDS
+counter.group1.counter4.description = CPU time spent (ms)
+counter.group1.counter5.names = PHYSICAL_MEMORY_BYTES
+counter.group1.counter5.description = Physical memory (bytes) snapshot
+counter.group1.counter6.names = VIRTUAL_MEMORY_BYTES
+counter.group1.counter6.description = Virtual memory (bytes) snapshot
+counter.group1.counter7.names = COMMITTED_HEAP_BYTES
+counter.group1.counter7.description = Total committed heap usage (bytes)
+counter.group1.counter8.names = REDUCE_SHUFFLE_BYTES
+counter.group1.counter8.description = Reduce shuffle bytes (bytes)
+counter.group1.counter9.names = GC_TIME_MILLIS
+counter.group1.counter9.description = GC time milliseconds
+counter.group1.counter10.names = MAP_OUTPUT_BYTES
+counter.group1.counter10.description = map output bytes
+counter.group1.counter11.names = REDUCE_INPUT_RECORDS
+counter.group1.counter11.description = reduce input records
+counter.group1.counter12.names = COMBINE_INPUT_RECORDS
+counter.group1.counter12.description = combine input records
+counter.group1.counter13.names = COMBINE_OUTPUT_RECORDS
+counter.group1.counter13.description = combine output records
+counter.group1.counter14.names = REDUCE_INPUT_GROUPS
+counter.group1.counter14.description = reduce input groups
+counter.group1.counter15.names = REDUCE_OUTPUT_RECORDS
+counter.group1.counter15.description = reduce output records
+counter.group1.counter16.names = SHUFFLED_MAPS
+counter.group1.counter16.description = shuffled maps
+counter.group1.counter17.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group1.counter17.description = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group1.counter18.names = MERGED_MAP_OUTPUTS
+counter.group1.counter18.description = MERGED_MAP_OUTPUTS
+counter.group1.counter19.names = FAILED_SHUFFLE
+counter.group1.counter19.description = FAILED_SHUFFLE
+
+counter.group2.name = org.apache.hadoop.mapreduce.JobCounter
+counter.group2.description = Map-Reduce Job Counter
+counter.group2.counter0.names = MB_MILLIS_MAPS
+counter.group2.counter0.description = Total megabyte-seconds taken by all map tasks
+counter.group2.counter1.names = MB_MILLIS_REDUCES
+counter.group2.counter1.description = Total megabyte-seconds taken by all reduce tasks
+counter.group2.counter2.names = VCORES_MILLIS_MAPS
+counter.group2.counter2.description = Total vcore-seconds taken by all map tasks
+counter.group2.counter3.names = VCORES_MILLIS_REDUCES
+counter.group2.counter3.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter4.names = OTHER_LOCAL_MAPS
+counter.group2.counter4.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter5.names = DATA_LOCAL_MAPS
+counter.group2.counter5.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter6.names = MILLIS_MAPS
+counter.group2.counter6.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter7.names = MILLIS_REDUCES
+counter.group2.counter7.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter8.names = TOTAL_LAUNCHED_MAPS
+counter.group2.counter8.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter9.names = TOTAL_LAUNCHED_REDUCES
+counter.group2.counter9.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter10.names = SLOTS_MILLIS_MAPS
+counter.group2.counter10.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter11.names = SLOTS_MILLIS_REDUCES
+counter.group2.counter11.description = Total vcore-seconds taken by all reduce tasks
+
+counter.group3.name = MapTaskAttemptCounter
+counter.group3.description = Reduce Task Attempt Counter Aggregation
+counter.group3.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group3.counter1.names = MAP_INPUT_RECORDS
+counter.group3.counter2.names = MERGED_MAP_OUTPUTS
+counter.group3.counter3.names = SPILLED_RECORDS
+counter.group3.counter4.names = MAP_OUTPUT_BYTES
+counter.group3.counter5.names = COMMITTED_HEAP_BYTES
+counter.group3.counter6.names = FAILED_SHUFFLE
+counter.group3.counter7.names = CPU_MILLISECONDS
+counter.group3.counter8.names = SPLIT_RAW_BYTES
+counter.group3.counter9.names = COMBINE_INPUT_RECORDS
+counter.group3.counter10.names = PHYSICAL_MEMORY_BYTES
+counter.group3.counter11.names = TASK_ATTEMPT_DURATION
+counter.group3.counter12.names = VIRTUAL_MEMORY_BYTES
+counter.group3.counter13.names = MAP_OUTPUT_RECORDS
+counter.group3.counter14.names = GC_TIME_MILLIS
+counter.group3.counter15.names = COMBINE_OUTPUT_RECORDS
+counter.group3.counter16.names = REDUCE_INPUT_GROUPS
+counter.group3.counter17.names = REDUCE_INPUT_RECORDS
+counter.group3.counter18.names = REDUCE_OUTPUT_RECORDS
+counter.group3.counter19.names = REDUCE_SHUFFLE_BYTES
+counter.group3.counter20.names = SHUFFLED_MAPS
+
+counter.group4.name = ReduceTaskAttemptCounter
+counter.group4.description = Reduce Task Attempt Counter Aggregation
+counter.group4.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group4.counter1.names = MAP_INPUT_RECORDS
+counter.group4.counter2.names = MERGED_MAP_OUTPUTS
+counter.group4.counter3.names = SPILLED_RECORDS
+counter.group4.counter4.names = MAP_OUTPUT_BYTES
+counter.group4.counter5.names = COMMITTED_HEAP_BYTES
+counter.group4.counter6.names = FAILED_SHUFFLE
+counter.group4.counter7.names = CPU_MILLISECONDS
+counter.group4.counter8.names = SPLIT_RAW_BYTES
+counter.group4.counter9.names = COMBINE_INPUT_RECORDS
+counter.group4.counter10.names = PHYSICAL_MEMORY_BYTES
+counter.group4.counter11.names = TASK_ATTEMPT_DURATION
+counter.group4.counter12.names = VIRTUAL_MEMORY_BYTES
+counter.group4.counter13.names = MAP_OUTPUT_RECORDS
+counter.group4.counter14.names = GC_TIME_MILLIS
+counter.group4.counter15.names = COMBINE_OUTPUT_RECORDS
+counter.group4.counter16.names = REDUCE_INPUT_GROUPS
+counter.group4.counter17.names = REDUCE_INPUT_RECORDS
+counter.group4.counter18.names = REDUCE_OUTPUT_RECORDS
+counter.group4.counter19.names = REDUCE_SHUFFLE_BYTES
+counter.group4.counter20.names = SHUFFLED_MAPS
+
+counter.group5.name = MapTaskAttemptFileSystemCounter
+counter.group5.description = Map Task Attempt File System Counter Aggregation
+counter.group5.counter0.names = FILE_READ_OPS
+counter.group5.counter1.names = FILE_WRITE_OPS
+counter.group5.counter2.names = FILE_BYTES_READ
+counter.group5.counter3.names = FILE_LARGE_READ_OPS
+counter.group5.counter4.names = HDFS_BYTES_READ
+counter.group5.counter5.names = FILE_BYTES_WRITTEN
+counter.group5.counter6.names = HDFS_LARGE_READ_OPS
+counter.group5.counter7.names = HDFS_BYTES_WRITTEN
+counter.group5.counter8.names = HDFS_READ_OPS
+
+counter.group6.name = ReduceTaskAttemptFileSystemCounter
+counter.group6.description = Reduce Task Attempt File System Counter Aggregation
+counter.group6.description = Map-Reduce Job Counter
+counter.group6.counter0.names = FILE_READ_OPS
+counter.group6.counter1.names = FILE_WRITE_OPS
+counter.group6.counter2.names = FILE_BYTES_READ
+counter.group6.counter3.names = FILE_LARGE_READ_OPS
+counter.group6.counter4.names = HDFS_BYTES_READ
+counter.group6.counter5.names = FILE_BYTES_WRITTEN
+counter.group6.counter6.names = HDFS_LARGE_READ_OPS
+counter.group6.counter7.names = HDFS_BYTES_WRITTEN
+counter.group6.counter8.names = HDFS_READ_OPS
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config
new file mode 100644
index 0000000..ee8c0c5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ATTEMPT_TIMEOUT = false|AttemptID:\S+ Timed out after \d+ secs
+INTERFACE_EXPECTED = false|but interface was expected
+FILE_NOT_EXIST = false|File does not exist
+HIVE_BAD_SCHEMA_EXCEPTION = false|Caused by: org.apache.hadoop.hive.serde2.avro.BadSchemaException
+CONNECTION_RESET_BY_PEER = false|Connection reset by peer
+CONTAINER_KILLED_BY_APPMASTER = false|Container killed by the ApplicationMaster
+CONTAINER_CLEANUP_FAILURE = false|cleanup failed for container container_
+USER_NOT_FOUND = false|User \S+ not found
+TASK_TREE_BEYOND_MEMORY_LIMIT = false|^TaskTree \S+ is running beyond memory-limits
+EBAY_APPMON_LOG_GET_TRAVERSER = false|^Error: com.ebay.appmon.log.traverser.LogTraverser.getTraverser
+MAP_OUTPUT_LOST = false|Map output lost
+BEYOND_PHYSICAL_MEMORY_LIMITS = false|running beyond physical memory limits
+GC_OVERHEAD_LIMIT_EXCEEDED = false|GC overhead limit exceeded
+NO_SPACE_LEFT = false|No space left
+MKDIR_FAILURE = false|mkdir of file:\S+ failed
+KILLED_CLEAN_BY_USER = false|Task has been KILLED by the user
+KILLED_UNCLEAN_BY_USER = false|Task has been KILLED_UNCLEAN by the user
+FAILED_TO_REPORT_STATUS = false|failed to report status for \d+ seconds. Killing
+EXCEPTION_FROM_CONTAINER_LAUNCH = false|^Exception from container-launch
+LOST_TASK_TRACKER = false|^Lost task tracker
+TOO_MANY_FETCH_FAILURES = false|^Too many fetch-failures$
+JAVA_HEAP_SPACE = false|$Error: Java heap space$
+JAVA_EXCEPTION = true|^(?:error: Error: |Error: )?(\S+Exception|\S+Error)
+JAVA_THROWABLE = false|^(?:error: Error: |Error: )?java.lang.Throwable
+NO_DETAIL = false|^$
+UNKNOWN = false|.*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
new file mode 100644
index 0000000..8cb1aa3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "local",
+    "topologyName" : "mr_history",
+    "stormConfigFile" : "storm.yaml",
+    "parallelismConfig" : {
+      "mrHistoryJobExecutor" : 6
+    },
+    "tasks" : {
+      "mrHistoryJobExecutor" : 6
+    },
+    "workers" : 3
+  },
+
+  "jobExtractorConfig" : {
+    "site" : "sandbox",
+    "mrVersion": "MRVer2",
+    "readTimeOutSeconds" : 10
+  },
+
+  "dataSourceConfig" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkPort" : "2181",
+    "zkRoot" : "/test_mrjobhistory",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 20000,
+    "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
+    "principal":"", #if not need, then empty
+    "keytab":"",
+    "basePath" : "/mr-history/done",
+    "pathContainsJobTrackerName" : false,
+    "jobTrackerName" : "",
+    "zeroBasedMonth" : false,
+    "dryRun" : false,
+    "partitionerCls" : "org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner",
+    "timeZone" : "UTC"
+  },
+
+  "eagleProps" : {
+    "mailHost" : "abc.com",
+    "mailDebug" : "true",
+    "eagleService": {
+      "host": "sandbox.hortonworks.com",
+      "port": 9099,
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+  
+  "MRConfigureKeys" : [
+    "mapreduce.map.output.compress",
+    "mapreduce.map.output.compress.codec",
+    "mapreduce.output.fileoutputformat.compress",
+    "mapreduce.output.fileoutputformat.compress.type",
+    "mapreduce.output.fileoutputformat.compress.codec",
+    "mapred.output.format.class",
+    "eagle.job.runid",
+    "eagle.job.runidfieldname",
+    "eagle.job.name",
+    "eagle.job.normalizedfieldname",
+    "eagle.alert.email",
+    "eagle.job.alertemailaddress",
+    "dataplatform.etl.info",
+    "mapreduce.map.memory.mb",
+    "mapreduce.reduce.memory.mb",
+    "mapreduce.map.java.opts",
+    "mapreduce.reduce.java.opts"
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml
new file mode 100644
index 0000000..11e8486
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml
@@ -0,0 +1,497 @@
+<?xml version="1.0"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+	~ contributor license agreements. See the NOTICE file distributed with ~
+	this work for additional information regarding copyright ownership. ~ The
+	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+	"License"); you may not use this file except in compliance with ~ the License.
+	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+	~ ~ Unless required by applicable law or agreed to in writing, software ~
+	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+	License for the specific language governing permissions and ~ limitations
+	under the License. -->
+
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
+
+<!-- i/o properties -->
+
+  <property>
+    <name>io.file.buffer.size</name>
+    <value>131072</value>
+    <description>The size of buffer for use in sequence files.
+  The size of this buffer should probably be a multiple of hardware
+  page size (4096 on Intel x86), and it determines how much data is
+  buffered during read and write operations.</description>
+  </property>
+
+<property>
+  <description>If users connect through a SOCKS proxy, we don't
+   want their SocketFactory settings interfering with the socket
+   factory associated with the actual daemons.</description>
+   <name>hadoop.rpc.socket.factory.class.default</name>
+   <value>org.apache.hadoop.net.StandardSocketFactory</value>
+</property>
+
+<property>
+  <name>hadoop.tmp.dir</name>
+  <value>/tmp/hadoop/hadoop-${user.name}</value>
+  <description>A base for other temporary directories.</description>
+</property>
+
+<property>
+  <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
+  <value></value>
+</property>
+
+<property>
+  <name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name>
+  <value></value>
+</property>
+              
+  <property>
+    <name>io.serializations</name>
+    <value>org.apache.hadoop.io.serializer.WritableSerialization</value>
+  </property>
+
+  <property>
+    <name>io.compression.codecs</name>
+    <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value>
+  </property>
+
+  <!-- LZO: see http://www.facebook.com/notes/cloudera/hadoop-at-twitter-part-1-splittable-lzo-compression/178581952002 -->
+  <property>
+      <name>io.compression.codec.lzo.class</name>
+      <value>com.hadoop.compression.lzo.LzoCodec</value>
+  </property>
+
+
+<!-- file system properties -->
+
+  <property>
+    <name>fs.defaultFS</name>
+    <!-- cluster variant -->
+    <value>hdfs://apollo-phx-nn-ha</value>
+    <description>The name of the default file system.  Either the
+  literal string "local" or a host:port for NDFS.</description>
+    <final>true</final>
+  </property>
+
+  <property>
+    <description>Topology script</description>
+    <name>net.topology.script.file.name</name>
+    <value>/apache/hadoop/etc/hadoop/topology</value>
+    <final>true</final>
+  </property>
+
+  <property>
+    <name>fs.trash.interval</name>
+    <value>480</value>
+    <description>Number of minutes between trash checkpoints.
+                 If zero, the trash feature is disabled.
+    </description>
+  </property>	
+
+  <!-- mobius-proxyagent impersonation configurations -->
+<property>
+  <name>hadoop.proxyuser.mobius-proxyagent.groups</name>
+  <value>hdmi-mm,hdmi-set,hdmi-research,hdmi-technology,hdmi-hadoopeng,hdmi-cs,hdmi-milo,hdmi-appdev,hdmi-siteanalytics,hdmi-prod,hdmi-others,hdmi-sdc,hdmi-finance,hdmi-est,hdmi-cci,hdmi-mptna,hdmi-xcom,hdmi-stu,hdmi-mobile</value>
+  <description>Allow user mobius-proxyagent to impersonate any members of the groups </description>
+</property>
+
+<property>
+    <name>hadoop.proxyuser.mobius-proxyagent.hosts</name>
+    <value>10.114.118.13,10.115.201.53</value>
+    <description>The mobius-proxyagent can connect from hosts to impersonate a user</description>
+</property>
+
+<property>
+      <name>hadoop.proxyuser.bridge_adm.groups</name>
+      <value>hdmi-mm,hdmi-set,hdmi-research,hdmi-technology,hdmi-hadoopeng,hdmi-cs,hdmi-milo,hdmi-appdev,hdmi-siteanalytics,hdmi-prod,hdmi-others,hdmi-sdc,hdmi-finance,hdmi-est,hdmi-cci,hdmi-mptna,hdmi-xcom,hdmi-stu,hdmi-mobile</value>
+      <description>Allow user bridge_adm (Teradata-Hadoop bridge) to impersonate any members of the groups </description>
+</property>
+
+<property>
+    <name>hadoop.proxyuser.bridge_adm.hosts</name>
+    <value>10.103.47.11,10.103.47.12,10.103.47.13,10.103.47.14,10.103.47.15,10.103.47.16,10.103.47.17,10.103.47.18,10.103.47.19,10.103.47.20,10.103.47.21,10.103.47.22,10.103.48.11,10.103.48.12,10.103.48.13,10.103.48.14,10.103.48.15,10.103.48.16,10.103.48.17,10.103.48.18,10.103.48.19,10.103.48.20,10.103.48.21,10.103.48.22,10.103.88.11,10.103.88.12,10.103.88.13,10.103.88.14,10.103.88.15,10.103.88.16,10.103.88.17,10.103.88.18,10.103.88.19,10.103.88.20,10.103.88.21,10.103.88.22,10.103.88.23,10.103.88.24,10.103.88.25,10.103.88.26,10.103.88.27,10.103.88.28,10.103.88.29,10.103.88.30,10.103.88.31,10.103.88.32,10.103.88.33,10.103.88.34,10.103.89.11,10.103.89.12,10.103.89.13,10.103.89.14,10.103.89.15,10.103.89.16,10.103.89.17,10.103.89.18,10.103.89.19,10.103.89.20,10.103.89.21,10.103.89.22,10.103.89.23,10.103.89.24,10.103.89.25,10.103.89.26,10.103.89.27,10.103.89.28,10.103.89.29,10.103.89.30,10.103.89.31,10.103.89.32,10.103.89.33,10.103.89.34,10.115.37.50,10.115.37.51,10.115.37.52,10.115.37.5
 3,10.115.38.50,10.115.38.51,10.115.38.52,10.115.38.53,10.115.208.11,10.115.208.12,10.115.208.13,10.115.208.14,10.115.208.15,10.115.208.16,10.115.208.17,10.115.208.18,10.115.208.19,10.115.208.20,10.115.208.21,10.115.208.22,10.115.208.23,10.115.208.24,10.115.208.25,10.115.208.26,10.103.158.101,10.103.158.102,10.103.158.103,10.103.158.104,10.103.158.105,10.103.158.106,10.103.158.107,10.103.158.108,10.103.158.109,10.103.158.110,10.103.158.111,10.103.158.112,10.103.158.113,10.103.158.114,10.103.158.115,10.103.158.116</value>
+    <description>The bridge_adm user (Teradata-Hadoop bridge) can connect from hosts to impersonate a user</description>
+</property>
+
+<property>
+    <name>hadoop.proxyuser.hadoop.hosts</name>
+    <value>*</value>
+</property>
+
+<property>
+    <name>hadoop.proxyuser.hadoop.groups</name>
+    <value>*</value>
+</property>
+
+<property>
+   <name>hadoop.proxyuser.sg_adm.groups</name>
+   <value>hdmi-etl</value>
+   <description>Allow user sg_adm (HDMIT-4462) to impersonate any  members of the groups </description>
+</property>
+
+<property>
+   <name>hadoop.proxyuser.sg_adm.hosts</name>
+   <value>*</value>
+   <description>The sg_adm user (HDMIT-4462) can connect from hosts to impersonate a user</description>
+</property>
+
+  <property>
+    <name>fs.inmemory.size.mb</name>
+    <value>256</value>
+  </property>
+
+  <!-- ipc properties: copied from kryptonite configuration -->
+  <property>
+    <name>ipc.client.idlethreshold</name>
+    <value>8000</value>
+    <description>Defines the threshold number of connections after which
+               connections will be inspected for idleness.
+  </description>
+  </property>
+
+  <property>
+    <name>ipc.client.connection.maxidletime</name>
+    <value>30000</value>
+    <description>The maximum time after which a client will bring down the
+               connection to the server.
+  </description>
+  </property>
+
+  <property>
+    <name>ipc.client.connect.max.retries</name>
+    <value>50</value>
+    <description>Defines the maximum number of retries for IPC connections.</description>
+  </property>
+
+  <!-- Web Interface Configuration -->
+  <property>
+    <name>webinterface.private.actions</name>
+    <value>false</value>
+    <description> If set to true, the web interfaces of JT and NN may contain
+                actions, such as kill job, delete file, etc., that should
+                not be exposed to public. Enable this option if the interfaces
+                are only reachable by those who have the right authorization.
+  </description>
+  </property>
+
+<property>
+  <name>hadoop.proxyuser.hive.groups</name>
+  <value>*</value>
+  <description>
+     Proxy group for Hadoop.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.proxyuser.hive.hosts</name>
+  <value>*</value>
+  <description>
+     Proxy host for Hadoop.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.proxyuser.oozie.groups</name>
+  <value>*</value>
+  <description>
+     Proxy group for Hadoop.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.proxyuser.oozie.hosts</name>
+  <value>phxaishdc9en0007-be.phx.ebay.com</value>
+  <description>
+     Proxy host for Hadoop.
+  </description>
+</property>
+
+<!-- BEGIN security configuration -->
+  <property>
+    <name>hadoop.security.authentication</name>
+    <value>kerberos</value>
+    <!-- A value of "simple" would  disable security. -->
+  </property>
+  
+  <property>
+    <name>hadoop.security.authorization</name>
+    <value>true</value>
+  </property>
+
+  <!-- Setting to ShellBasedUnixGroupsMapping to override the default of 
+       JniBasedUnixGroupsMappingWithFallback.  See HWX case 00006991 -->
+  <property>
+    <name>hadoop.security.group.mapping</name>
+    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.filter.initializers</name>
+    <value>org.apache.hadoop.security.AuthenticationFilterInitializer</value>
+  </property>
+
+<!-- BEGIN hadoop.http.authentication properties --> 
+  <property>
+    <name>hadoop.http.authentication.type</name>
+    <value>org.apache.hadoop.security.authentication.server.CompositeAuthenticationHandler</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.token.validity</name>
+    <value>36000</value>
+    <!-- in seconds -->
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.signature.secret.file</name>
+    <value>/etc/hadoop/http_auth_secret</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.cookie.domain</name>
+    <value>ebay.com</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.pingFederate.config.file</name>
+    <value>/etc/hadoop/pingfederate-agent-config.txt</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.pingFederate.url</name>
+    <value>https://sso.corp.ebay.com/sp/startSSO.ping?PartnerIdpId=eBayHadoop</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.pingFederate.anonymous.allowed</name>
+    <value>true</value>
+  </property>
+
+<!-- BEGIN properties enabled per HDP-2.1.3 upgrade -->
+
+  <property>
+    <name>hadoop.http.authentication.composite.handlers</name>
+    <value>org.apache.hadoop.security.authentication.server.PingFederateAuthenticationHandler,kerberos,anonymous</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.composite.default-non-browser-handler-type</name>
+    <value>kerberos</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.kerberos.keytab</name>
+    <value>/etc/hadoop/hadoop.keytab</value>
+  </property>
+
+  <property>
+    <name>hadoop.http.authentication.kerberos.principal</name>
+    <value>*</value>
+  </property>
+
+<!-- END properties enabled per HDP-2.1.3 upgrade -->
+
+<!-- END hadoop.http.authentication properties --> 
+
+
+  <property>
+    <name>hadoop.security.auth_to_local</name>
+    <value>
+        RULE:[1:$1]
+        RULE:[2:$1]
+        DEFAULT
+    </value>
+  </property>
+
+  <property>
+    <name>kerberos.multiplerealm.supported</name>
+    <value>true</value>
+  </property>
+  
+  <property>
+    <name>kerberos.multiplerealm.realms</name>
+    <value>CORP.EBAY.COM</value>
+  </property>
+
+<!--SSL SUPPORT -->
+
+<property>
+  <name>hadoop.ssl.keystores.factory.class</name>
+    <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
+    <description>
+          The keystores factory to use for retrieving certificates.
+    </description>
+</property>
+
+<property>
+  <name>hadoop.ssl.require.client.cert</name>
+  <value>false</value>
+  <description>Whether client certificates are required</description>
+</property>
+
+<property>
+  <name>hadoop.ssl.hostname.verifier</name>
+  <value>ALLOW_ALL</value>
+  <description>
+    The hostname verifier to provide for HttpsURLConnections.
+    Valid values are: DEFAULT, STRICT, STRICT_I6, DEFAULT_AND_LOCALHOST and
+    ALLOW_ALL
+  </description>
+</property>
+
+<property>
+  <name>hadoop.ssl.server.conf</name>
+  <value>ssl-server.xml</value>
+  <description>
+    Resource file from which ssl server keystore information will be extracted.
+    This file is looked up in the classpath, typically it should be in Hadoop
+    conf/ directory.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.ssl.client.conf</name>
+  <value>ssl-client.xml</value>
+  <description>
+    Resource file from which ssl client keystore information will be extracted
+    This file is looked up in the classpath, typically it should be in Hadoop
+    conf/ directory.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.ssl.enabled</name>
+  <value>false</value>
+  <description>
+    Whether to use SSL for the HTTP endpoints. If set to true, the
+    NameNode, DataNode, ResourceManager, NodeManager, HistoryServer and
+    MapReduceAppMaster web UIs will be served over HTTPS instead HTTP.
+  </description>
+</property>
+
+<!-- User Group Resolution -->
+
+<property>
+    <name>hadoop.security.groups.cache.secs</name>
+    <value>3600</value>
+</property>
+
+<!-- END security configuration -->
+
+
+
+<!-- BEGIN properties enabled per HDP-2.1.3 upgrade -->
+
+<!-- BEGIN Quality of Service -->
+
+  <property>
+    <name>ipc.8020.callqueue.impl</name>
+    <value>com.ebay.hadoop.ipc.FairCallQueue</value>
+  </property>
+
+  <property>
+    <name>ipc.8020.identity-provider.impl</name>
+    <value>com.ebay.hadoop.ipc.EbayUserIdentityProvider</value>
+  </property>
+
+  <property>
+    <name>ipc.8020.faircallqueue.rpc-scheduler</name>
+    <value>com.ebay.hadoop.ipc.DecayRpcScheduler</value>
+  </property>
+
+  <property>
+    <name>ipc.8020.faircallqueue.priority-levels</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>ipc.8020.faircallqueue.decay-scheduler.thresholds</name>
+   <!-- <value>1,2,7,10,20,30,40,50,60</value> -->
+    <value>1,2,3,5,8,13,20,35,50</value>
+  </property>
+
+  <property>
+    <name>ipc.8020.faircallqueue.decay-scheduler.period-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <name>ipc.8020.faircallqueue.multiplexer.weights</name>
+   <!-- <value>10,5,3,2,1,1,1,1,1,1</value> -->
+     <value>80,30,25,20,17,12,6,3,2,1</value>
+  </property>
+
+<!-- END Quality of Service -->
+
+
+
+<!-- BEGIN Selective Encryption --> 
+<!-- disabled per HADP-6065 - miguenther - 26 August 2014 
+  <property>
+    <name>hadoop.rpc.protection</name>
+    <value>authentication,privacy</value>
+    <final>true</final>
+  </property>
+
+  <property>
+    <name>hadoop.security.saslproperties.resolver.class</name>
+    <value>org.apache.hadoop.security.WhitelistBasedResolver</value>
+    <final>true</final>
+  </property>
+
+  <property>
+    <name>hadoop.security.sasl.variablewhitelist.enable</name>
+    <value>true</value>
+    <final>true</final>
+  </property>
+-->
+<!-- END Selective Encryption -->
+
+
+<!-- END properties enabled per HDP-2.1.3 upgrade -->
+
+<property>
+  <name>ha.zookeeper.quorum</name>
+  <value>apollo-phx-zk-1.vip.ebay.com:2181,apollo-phx-zk-2.vip.ebay.com:2181,apollo-phx-zk-3.vip.ebay.com:2181,apollo-phx-zk-4.vip.ebay.com:2181,apollo-phx-zk-5.vip.ebay.com:2181</value>
+</property>
+
+<!-- NEW QOP Proposed configs below - Same as Ares Tiffany Sept 01, 2015 -->
+<property>
+    <name>hadoop.rpc.protection</name>
+    <value>authentication,privacy</value>
+</property>
+
+  <property>
+      <name>hadoop.security.saslproperties.resolver.class</name>
+      <value>org.apache.hadoop.security.WhitelistBasedResolver</value>
+  </property>
+
+  <property>
+      <name>hadoop.security.sasl.fixedwhitelist.file</name>
+      <value>/etc/hadoop/fixedwhitelist</value>
+  </property>
+
+  <property>
+      <name>hadoop.security.sasl.variablewhitelist.enable</name>
+      <value>true</value>
+  </property>
+
+  <property>
+      <name>hadoop.security.sasl.variablewhitelist.file</name>
+      <value>/etc/hadoop/whitelist</value>
+  </property>
+
+  <property>
+        <name>hadoop.security.sasl.variablewhitelist.cache.secs</name>
+        <value>3600</value>
+  </property>
+<!-- END NEW QOP Proposed configs below - Same as Ares Tiffany Sept 01, 2015 -->
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml
new file mode 100644
index 0000000..52ba754
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml
@@ -0,0 +1,449 @@
+<?xml version="1.0"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+	~ contributor license agreements. See the NOTICE file distributed with ~
+	this work for additional information regarding copyright ownership. ~ The
+	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+	"License"); you may not use this file except in compliance with ~ the License.
+	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+	~ ~ Unless required by applicable law or agreed to in writing, software ~
+	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+	License for the specific language governing permissions and ~ limitations
+	under the License. -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+  <!-- The directories for NN, DN and SNN configs -->
+
+  <property>
+    <name>dfs.namenode.name.dir</name>
+    <value>/hadoop/nn1/1</value>
+    <final>true</final>
+  </property>
+
+  <property>
+    <name>dfs.datanode.data.dir</name>
+    <value>/hadoop/1/data,/hadoop/2/data,/hadoop/3/data,/hadoop/4/data,/hadoop/5/data,/hadoop/6/data,/hadoop/7/data,/hadoop/8/data,/hadoop/9/data,/hadoop/10/data,/hadoop/11/data,/hadoop/12/data</value>
+  </property>
+
+  <property>
+    <name>dfs.blockreport.initialDelay</name>
+    <value>900</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.decommission.interval</name>
+    <value>150</value>
+  </property>
+
+  <!-- The Nodes include and exclude -->
+
+  <property>
+    <name>dfs.hosts</name>
+    <!-- The files containing hosts allowed to connect to namenode -->
+    <value>/apache/hadoop/etc/hadoop/hosts</value>
+  </property>
+
+  <property>
+    <name>dfs.hosts.exclude</name>
+    <!-- The files containing hosts allowed to connect to namenode -->
+    <value>/apache/hadoop/etc/hadoop/hdfs-exclude</value>
+  </property>
+
+
+  <property>
+    <name>dfs.datanode.failed.volumes.tolerated</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <name>dfs.datanode.balance.bandwidthPerSec</name>
+    <value>10485760</value>
+  </property>
+
+  <property>
+    <!-- Amount of space which HDFS will refuse to use in bytes -->
+    <name>dfs.datanode.du.reserved</name>
+    <value>107374182400</value> <!-- 100 GB-->
+  </property>
+
+  <!-- RMERCHIA AISOPS159160 2012-09-25 -->
+
+  <property>
+    <name>dfs.heartbeat.interval</name>
+    <value>6</value>
+    <description>how frequently dn send a heartbeat.</description>
+  </property>
+
+  <!-- RMERCHIA AISOPS159160 2012-09-25  change to 6 hours on 2012-10-02 -->
+
+  <property>
+    <name>dfs.blockreport.intervalMsec</name>
+    <value>21600000</value>
+    <description>how frequently dn send a blockreport.</description>
+  </property>
+
+  <property>
+    <name>dfs.namenode.safemode.threshold-pct</name>
+    <value>1.0f</value>
+    <!-- Allows 10 blocks unreported out of 10,000,000 -->
+    <description>
+      Specifies the percentage of blocks that should satisfy
+      the minimal replication requirement defined by dfs.replication.min.
+      Values less than or equal to 0 mean not to start in safe mode.
+      Values greater than 1 will make safe mode permanent.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.namenode.safemode.extension</name>
+    <value>120000</value>
+    <!-- 2 minutes -->
+    <description> Determines extension of safe mode in milliseconds after the threshold level is reached. </description>
+  </property>
+
+  <property>
+    <name>dfs.permissions.enabled</name>
+    <value>true</value>
+    <description>
+      If "true", enable permission checking in HDFS.
+      If "false", permission checking is turned off,
+      but all other behavior is unchanged.
+      Switching from one parameter value to the other does not change the mode,
+      owner or group of files or directories.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.replication</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <name>dfs.blocksize</name>
+    <!-- 128mb (default 64m or 67108864) -->
+    <value>268435456</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.handler.count</name>
+    <value>128</value>
+  </property>
+
+  <property>
+    <name>dfs.datanode.handler.count</name>
+    <value>50</value>
+  </property>
+
+  <!-- updated from 4k to 16k as part of HADP-6065 - miguenther - 26 august 2014 -->
+  <property>
+    <name>dfs.datanode.max.transfer.threads</name>
+    <value>16384</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.replication.max-streams</name>
+    <value>40</value>
+  </property>
+
+  <property>
+    <name>dfs.webhdfs.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>dfs.block.local-path-access.user</name>
+    <value>hadoop</value>
+    <description>the user who is allowed to perform short circuit reads.</description>
+  </property>
+
+  <property>
+    <name>dfs.block.access.token.enable</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.name.dir.restore</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>dfs.ls.limit</name>
+    <value>4096</value>
+  </property>
+
+  <!-- NameNode security config -->
+  <property>
+    <name>dfs.web.authentication.kerberos.keytab</name>
+    <value>/etc/hadoop/hadoop.keytab</value>
+  </property>
+  <property>
+    <name>dfs.namenode.kerberos.internal.spnego.principal</name>
+    <value>*</value>
+  </property>
+  <property>
+    <name>dfs.namenode.keytab.file</name>
+    <value>/etc/hadoop/hadoop.keytab</value>
+  </property>
+  <property>
+    <name>dfs.namenode.kerberos.principal</name>
+    <value>hadoop/_HOST@APD.EBAY.COM</value>
+    <!-- _HOST will be replaced by the the domain name present in fs.default.name. It is better to use the actual host name  -->
+  </property>
+  <property>
+    <name>dfs.web.authentication.kerberos.principal</name>
+    <value>HTTP/_HOST@APD.EBAY.COM,HTTP/apollo-hdfs.corp.ebay.com@CORP.EBAY.COM</value>
+  </property>
+
+  <!-- DataNode security config -->
+  <property>
+    <name>dfs.datanode.data.dir.perm</name>
+    <value>700</value>
+  </property>
+  <property>
+    <name>dfs.datanode.address</name>
+    <value>0.0.0.0:1004</value>
+  </property>
+  <property>
+    <name>dfs.datanode.http.address</name>
+    <value>0.0.0.0:1006</value>
+  </property>
+  <property>
+    <name>dfs.datanode.keytab.file</name>
+    <value>/etc/hadoop/hadoop.keytab</value>
+  </property>
+  <property>
+    <name>dfs.datanode.kerberos.principal</name>
+    <value>hadoop/_HOST@APD.EBAY.COM</value>
+    <!-- _HOST will be replaced by the frst domain name mapped to the ip -->
+  </property>
+
+  <property>
+    <name>dfs.cluster.administrators</name>
+    <value> hdmi-hadoopeng</value>
+  </property>
+
+  <!-- HTTPS SUPPORT -->
+
+  <property>
+    <name>dfs.https.need.client.auth</name>
+    <value>false</value>
+    <description>Whether SSL client certificate authentication is required
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.https.server.keystore.resource</name>
+    <value>ssl-server.xml</value>
+    <description>Resource file from which ssl server keystore
+      information will be extracted
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.https.client.keystore.resource</name>
+    <value>ssl-client.xml</value>
+    <description>Resource file from which ssl client keystore
+      information will be extracted
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.datanode.https.address</name>
+    <value>0.0.0.0:50075</value>
+  </property>
+
+  <property>
+    <name>dfs.datanode.http.address</name>
+    <value>0.0.0.0:1006</value>
+  </property>
+
+
+
+  <property>
+    <name>dfs.domain.socket.path</name>
+    <value>/var/run/hadoop-hdfs/dn</value>
+  </property>
+
+  <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>true</value>
+  </property>
+
+
+  <property>
+    <name>dfs.namenode.service.handler.count</name>
+    <value>55</value>
+  </property>
+
+
+
+
+  <!-- BEGIN properties enabled per HDP-2.1.3 upgrade -->
+
+  <property>
+    <name>dfs.namenode.acls.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>dfs.http.policy</name>
+    <value>HTTP_AND_HTTPS</value>
+  </property>
+
+  <property>
+    <name>dfs.web.authentication.filter</name>
+    <value>org.apache.hadoop.hdfs.web.TokenAuthFilter,authentication</value>
+  </property>
+
+  <!-- END properties enabled per HDP-2.1.3 upgrade -->
+
+
+  <!-- added as part of HAPD-6065 - miguenther 26 August 2014 -->
+  <property>
+    <name>ipc.server.read.threadpool.size</name>
+    <value>3</value>
+  </property>
+
+
+  <!-- Apollo PHX HA Configs -->
+  <property>
+    <name>dfs.nameservices</name>
+    <value>apollo-phx-nn-ha</value>
+    <description>Logical name for this new nameservice</description>
+  </property>
+
+  <property>
+    <name>dfs.ha.namenodes.apollo-phx-nn-ha</name>
+    <value>nn1,nn2</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.rpc-address.apollo-phx-nn-ha.nn1</name>
+    <value>apollo-phx-nn.vip.ebay.com:8020</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.rpc-address.apollo-phx-nn-ha.nn2</name>
+    <value>apollo-phx-nn-2.vip.ebay.com:8020</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.servicerpc-address.apollo-phx-nn-ha.nn1</name>
+    <value>apollo-phx-nn.vip.ebay.com:8030</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.servicerpc-address.apollo-phx-nn-ha.nn2</name>
+    <value>apollo-phx-nn-2.vip.ebay.com:8030</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.http-address.apollo-phx-nn-ha.nn1</name>
+    <value>apollo-phx-nn.vip.ebay.com:50080</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.http-address.apollo-phx-nn-ha.nn2</name>
+    <value>apollo-phx-nn-2.vip.ebay.com:50080</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.shared.edits.dir</name>
+    <value>qjournal://phxaishdc9en0010-be.phx.ebay.com:8485;phxaishdc9en0011-be.phx.ebay.com:8485;phxaishdc9en0012-be.phx.ebay.com:8485;phxaishdc9en0013-be.phx.ebay.com:8485;phxaishdc9en0014-be.phx.ebay.com:8485/apollo-phx-nn-ha</value>
+  </property>
+
+  <property>
+    <name>dfs.client.failover.proxy.provider.apollo-phx-nn-ha</name>
+    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
+  </property>
+
+  <property>
+    <name>dfs.ha.fencing.methods</name>
+    <value>sshfence
+      shell(/bin/true)
+    </value>
+  </property>
+
+  <property>
+    <name>dfs.ha.fencing.ssh.private-key-files</name>
+    <value>/home/hadoop/.ssh/id_rsa</value>
+  </property>
+
+  <property>
+    <name>dfs.ha.fencing.ssh.connect-timeout</name>
+    <value>30000</value>
+  </property>
+
+  <property>
+    <name>dfs.ha.automatic-failover.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>dfs.journalnode.edits.dir</name>
+    <value>/hadoop/qjm/apollo</value>
+  </property>
+
+  <property>
+    <name>dfs.journalnode.kerberos.principal</name>
+    <value>hadoop/_HOST@APD.EBAY.COM</value>
+  </property>
+
+  <property>
+    <name>dfs.journalnode.kerberos.internal.spnego.principal</name>
+    <value>HTTP/_HOST@APD.EBAY.COM</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.https-address.apollo-phx-nn-ha.nn2</name>
+    <value>apollo-phx-nn-2.vip.ebay.com:50070</value>
+  </property>
+
+  <property>
+    <name>dfs.namenode.https-address.apollo-phx-nn-ha.nn1</name>
+    <value>apollo-phx-nn.vip.ebay.com:50070</value>
+  </property>
+
+  <property>
+    <name>dfs.journalnode.keytab.file</name>
+    <value>/etc/hadoop/hadoop.keytab</value>
+  </property>
+
+  <!-- Apollo HA Configs END -->
+
+  <!-- BEGIN Selective Encryption as in Ares - Sept 01, 2015 Tiffany -->
+  <property>
+    <name>dfs.encrypt.data.transfer</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>dfs.encrypt.data.transfer.algorithm</name>
+    <value>rc4</value>
+    <final>true</final>
+  </property>
+  <property>
+    <name>dfs.trustedchannel.resolver.class</name>
+    <value>org.apache.hadoop.hdfs.datatransfer.FlagListTrustedChannelResolver</value>
+    <final>true</final>
+  </property>
+  <property>
+    <name>dfs.datatransfer.client.encrypt</name>
+    <value>false</value>
+    <final>true</final>
+  </property>
+
+  <!-- END Selective Encryption as in Ares - Sept 01, 2015 Tiffany -->
+
+  <!-- Post Upgrade - improve performance - Oct 23, 2015 Tiffany -->
+  <property>
+    <name>dfs.client.block.write.locateFollowingBlock.retries</name>
+    <value>8</value>
+  </property>
+  <!-- END Post Upgrade - improve performance - Oct 23, 2015 Tiffany -->
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties
new file mode 100644
index 0000000..71a5dac
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, DRFA, stdout
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file