You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/05 18:07:43 UTC
[4/8] incubator-eagle git commit: EAGLE-276 eagle support for mr &
spark history job monitoring mr & spark job history monitoring
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
new file mode 100644
index 0000000..451b921
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public class DefaultJobIdPartitioner implements JobIdPartitioner {
+ @Override
+ public int partition(int numTotalParts, String jobId) {
+ int hash = jobId.hashCode();
+ hash = Math.abs(hash);
+ return hash % numTotalParts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
new file mode 100644
index 0000000..30374c4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/HistoryJobProgressBolt.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+import java.util.*;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HistoryJobProgressBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(HistoryJobProgressBolt.class);
+
+ private final static int MAX_RETRY_TIMES = 3;
+ private Long m_minTimeStamp;
+ private int m_numTotalPartitions;
+ private JHFConfigManager configManager;
+ private Map<Integer, Long> m_partitionTimeStamp = new TreeMap<>();
+ public HistoryJobProgressBolt(String parentName, JHFConfigManager configManager) {
+ this.configManager = configManager;
+ m_numTotalPartitions = this.configManager.getConfig().getInt("envContextConfig.parallelismConfig." + parentName);
+ m_minTimeStamp = 0L;
+ }
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ Integer partitionId = tuple.getIntegerByField("partitionId");
+ Long timeStamp = tuple.getLongByField("timeStamp");
+ LOG.info("partition " + partitionId + ", timeStamp " + timeStamp);
+ if (!m_partitionTimeStamp.containsKey(partitionId) || (m_partitionTimeStamp.containsKey(partitionId) && m_partitionTimeStamp.get(partitionId) < timeStamp)) {
+ m_partitionTimeStamp.put(partitionId, timeStamp);
+ }
+
+ if (m_partitionTimeStamp.size() >= m_numTotalPartitions) {
+ //get min timestamp
+ Long minTimeStamp = Collections.min(m_partitionTimeStamp.values());
+
+ if (m_minTimeStamp == 0L) {
+ m_minTimeStamp = minTimeStamp;
+ }
+
+ if (m_minTimeStamp > minTimeStamp) {
+ //no need to update
+ return;
+ }
+
+ m_minTimeStamp = minTimeStamp;
+ final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ Map<String, String> baseTags = new HashMap<String, String>() { {
+ put("site", jobExtractorConfig.site);
+ } };
+ JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
+ entity.setCurrentTimeStamp(m_minTimeStamp);
+ entity.setTimestamp(m_minTimeStamp);
+ entity.setTags(baseTags);
+
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
+
+ client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+
+ List<JobProcessTimeStampEntity> entities = new ArrayList<>();
+ entities.add(entity);
+
+ int tried = 0;
+ while (tried <= MAX_RETRY_TIMES) {
+ try {
+ LOG.info("start flushing JobProcessTimeStampEntity entities of total number " + entities.size());
+ client.create(entities);
+ LOG.info("finish flushing entities of total number " + entities.size());
+ break;
+ } catch (Exception ex) {
+ if (tried < MAX_RETRY_TIMES) {
+ LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+ } else {
+ LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+ }
+ }
+ tried ++;
+ }
+
+ client.getJerseyClient().destroy();
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("failed to close eagle service client ", e);
+ }
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
new file mode 100644
index 0000000..a10599b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.crawler.*;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Zookeeper znode structure
+ * -zkRoot
+ * - partitions
+ * - 0 (20150101)
+ * - 1 (20150101)
+ * - 2 (20150101)
+ * - ... ...
+ * - N-1 (20150102)
+ * - jobs
+ * - 20150101
+ * - job1
+ * - job2
+ * - job3
+ * - 20150102
+ * - job1
+ * - job2
+ * - job3
+ *
+ * Spout can have multiple instances, which is supported by storm parallelism primitive.
+ *
+ * Under znode partitions, N child znodes (name is 0 based integer) would be created with each znode mapped to one spout instance. All jobs will be partitioned into N
+ * partitions by applying JobPartitioner class to each job Id. The value of each partition znode is the date when the last job in this partition
+ * is successfully processed.
+ *
+ * processing steps
+ * 1) In constructor,
+ * 2) In open(), calculate jobPartitionId for current spout (which should be exactly same to spout taskId within TopologyContext)
+ * 3) In open(), zkState.ensureJobPartitions to rebuild znode partitions if necessary. ensureJobPartitions is only done by one spout task as internally this is using lock
+ * 5) In nextTuple(), list job files by invoking hadoop API
+ * 6) In nextTuple(), iterate each jobId and invoke JobPartition.partition(jobId) and keep those jobs belonging to current partition Id
+ * 7) process job files (job history file and job configuration xml file)
+ * 8) add job Id to current date slot say for example 20150102 after this job is successfully processed
+ * 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should be configurable)
+ *
+ * Note:
+ * if one spout instance crashes and is brought up again, open() method would be invoked again, we need think of this scenario.
+ *
+ */
+
+public class JobHistorySpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(JobHistorySpout.class);
+
+ private int partitionId;
+ private int numTotalPartitions;
+ private transient JobHistoryZKStateManager zkState;
+ private transient JHFCrawlerDriver driver;
+ private JobHistoryContentFilter contentFilter;
+ private JobHistorySpoutCollectorInterceptor interceptor;
+ private JHFInputStreamCallback callback;
+ private JHFConfigManager configManager;
+ private JobHistoryLCM m_jhfLCM;
+
+ public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) {
+ this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
+ }
+
+ /**
+ * mostly this constructor signature is for unit test purpose as you can put customized interceptor here
+ * @param filter
+ * @param adaptor
+ */
+ public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager, JobHistorySpoutCollectorInterceptor adaptor) {
+ this.contentFilter = filter;
+ this.configManager = configManager;
+ this.interceptor = adaptor;
+ callback = new DefaultJHFInputStreamCallback(contentFilter, configManager, interceptor);
+ }
+
+ private int calculatePartitionId(TopologyContext context) {
+ int thisGlobalTaskId = context.getThisTaskId();
+ String componentName = context.getComponentId(thisGlobalTaskId);
+ List<Integer> globalTaskIds = context.getComponentTasks(componentName);
+ numTotalPartitions = globalTaskIds.size();
+ int index = 0;
+ for (Integer id : globalTaskIds) {
+ if (id == thisGlobalTaskId) {
+ return index;
+ }
+ index++;
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
+ final SpoutOutputCollector collector) {
+ partitionId = calculatePartitionId(context);
+ // sanity verify 0<=partitionId<=numTotalPartitions-1
+ if (partitionId < 0 || partitionId > numTotalPartitions) {
+ throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
+ partitionId + " and numTotalPartitions " + numTotalPartitions);
+ }
+ Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls;
+ JobIdPartitioner partitioner;
+ try {
+ partitioner = partitionerCls.newInstance();
+ } catch (Exception e) {
+ LOG.error("failing instantiating job partitioner class " + partitionerCls,e);
+ throw new IllegalStateException(e);
+ }
+ JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
+ zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
+ zkState.ensureJobPartitions(numTotalPartitions);
+ interceptor.setSpoutOutputCollector(collector);
+
+ try {
+ m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
+ driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(),
+ configManager.getControlConfig(),
+ callback,
+ zkState,
+ m_jhfLCM,
+ jobIdFilter,
+ partitionId);
+ } catch (Exception e) {
+ LOG.error("failing creating crawler driver");
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ Long modifiedTime = driver.crawl();
+ interceptor.collect(new ValuesArray(partitionId, modifiedTime));
+ } catch (Exception ex) {
+ LOG.error("fail crawling job history file and continue ...", ex);
+ try {
+ m_jhfLCM.freshFileSystem();
+ } catch (Exception e) {
+ LOG.error("failed to fresh file system ", e);
+ }
+ } finally {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
+ /**
+ * empty because framework will take care of output fields declaration
+ */
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("partitionId", "timeStamp"));
+ }
+
+ /**
+ * add to processedJob
+ */
+ @Override
+ public void ack(Object jobId) {
+ }
+
+ /**
+ * job is not fully processed
+ */
+ @Override
+ public void fail(Object jobId) {
+ }
+
+ @Override
+ public void deactivate() {
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
new file mode 100644
index 0000000..b58c84f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public interface JobIdFilter {
+ boolean accept(String jobId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
new file mode 100644
index 0000000..07b8519
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public class JobIdFilterByPartition implements JobIdFilter {
+ private JobIdPartitioner partitioner;
+ private int numTotalPartitions;
+ private int partitionId;
+
+ public JobIdFilterByPartition(JobIdPartitioner partitioner, int numTotalPartitions, int partitionId) {
+ this.partitioner = partitioner;
+ this.numTotalPartitions = numTotalPartitions;
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public boolean accept(String jobId) {
+ int part = partitioner.partition(numTotalPartitions, jobId);
+ if (part == partitionId) {
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
new file mode 100644
index 0000000..cc7e68c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.storm;
+
+public interface JobIdPartitioner {
+ int partition(int numTotalParts, String jobId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
new file mode 100644
index 0000000..308057b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.zkres;
+
+import java.util.List;
+
+public interface JobHistoryZKStateLCM {
+ void ensureJobPartitions(int numTotalPartitions);
+ String readProcessedDate(int partitionId);
+ List<String> readProcessedJobs(String date);
+ void updateProcessedDate(int partitionId, String date);
+ void addProcessedJob(String date, String jobId);
+ void truncateProcessedJob(String date);
+ void truncateEverything();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
new file mode 100644
index 0000000..24dd7be
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.zkres;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.List;
+
+public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
+ public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+ private String zkRoot;
+ private CuratorFramework _curator;
+ public static final String ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS = "lockForEnsureJobPartitions";
+ public static final String ZNODE_FORCE_START_FROM = "forceStartFrom";
+ public static final String ZNODE_PARTITIONS = "partitions";
+
+ public static final int BACKOFF_DAYS = 0;
+
+ private CuratorFramework newCurator(ZKStateConfig config) throws Exception {
+ return CuratorFrameworkFactory.newClient(
+ config.zkQuorum,
+ config.zkSessionTimeoutMs,
+ 15000,
+ new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
+ );
+ }
+
+ public JobHistoryZKStateManager(ZKStateConfig config) {
+ this.zkRoot = config.zkRoot;
+
+ try {
+ _curator = newCurator(config);
+ _curator.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close() {
+ _curator.close();
+ _curator = null;
+ }
+
+ private String readForceStartFrom() {
+ String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
+ try {
+ if (_curator.checkExists().forPath(path) != null) {
+ return new String(_curator.getData().forPath(path), "UTF-8");
+ }
+ } catch (Exception ex) {
+ LOG.error("fail reading forceStartFrom znode", ex);
+ }
+ return null;
+ }
+
+ private void deleteForceStartFrom() {
+ String path = zkRoot + "/" + ZNODE_FORCE_START_FROM;
+ try {
+ if (_curator.checkExists().forPath(path) != null) {
+ _curator.delete().forPath(path);
+ }
+ } catch(Exception ex) {
+ LOG.error("fail reading forceStartFrom znode", ex);
+ }
+ }
+
+ private String getProcessedDateAfterBackoff(int backOffDays) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
+ Calendar c = Calendar.getInstance();
+ c.add(Calendar.DATE, -1);
+ c.add(Calendar.DATE, -1 * backOffDays);
+ return sdf.format(c.getTime());
+ }
+
+ /**
+ * under zkRoot, znode forceStartFrom is used to force job is crawled from that date
+ * IF
+ * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
+ * THEN
+ * rebuild all partitions with the forceStartFrom
+ * ELSE
+ * IF
+ * partition structure is changed
+ * THEN
+ * IF
+ * there is valid mindate for existing partitions
+ * THEN
+ * rebuild job partitions from that valid mindate
+ * ELSE
+ * rebuild job partitions from (today - BACKOFF_DAYS)
+ * END
+ * ELSE
+ * do nothing
+ * END
+ * END
+ *
+ *
+ * forceStartFrom is deleted once its value is used, so next time when topology is restarted, program can run from where topology is stopped last time
+ */
+ @Override
+ public void ensureJobPartitions(int numTotalPartitions) {
+ // lock before rebuild job partitions
+ String lockForEnsureJobPartitions = zkRoot + "/" + ZNODE_LOCK_FOR_ENSURE_JOB_PARTITIONS;
+ InterProcessMutex lock = new InterProcessMutex(_curator, lockForEnsureJobPartitions);
+ String path = zkRoot + "/" + ZNODE_PARTITIONS;
+ try {
+ lock.acquire();
+ int minDate = 0;
+ String forceStartFrom = readForceStartFrom();
+ if (forceStartFrom != null) {
+ try {
+ minDate = Integer.valueOf(forceStartFrom);
+ } catch(Exception ex) {
+ LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom);
+ throw new IllegalStateException();
+ }
+ } else {
+ boolean pathExists = _curator.checkExists().forPath(path) == null ? false : true;
+ boolean structureChanged = true;
+ if (pathExists) {
+ int currentCount = _curator.getChildren().forPath(path).size();
+ if (numTotalPartitions == currentCount) {
+ structureChanged = false;
+ LOG.info("znode partitions structure is unchanged");
+ } else {
+ LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions);
+ }
+ }
+ if (!structureChanged)
+ return; // do nothing
+
+ if (pathExists) {
+ List<String> partitions = _curator.getChildren().forPath(path);
+ for (String partition : partitions) {
+ String date = new String(_curator.getData().forPath(path + "/" + partition), "UTF-8");
+ int tmp = Integer.valueOf(date);
+ if(tmp < minDate)
+ minDate = tmp;
+ }
+ }
+
+ if (minDate == 0) {
+ minDate = Integer.valueOf(getProcessedDateAfterBackoff(BACKOFF_DAYS));
+ }
+ }
+ rebuildJobPartitions(numTotalPartitions, String.valueOf(minDate));
+ deleteForceStartFrom();
+ } catch (Exception e) {
+ LOG.error("fail building job partitions", e);
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ lock.release();
+ } catch(Exception e) {
+ LOG.error("fail releasing lock", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void rebuildJobPartitions(int numTotalPartitions, String startingDate) throws Exception {
+ LOG.info("rebuild job partitions with numTotalPartitions " + numTotalPartitions + " with starting date " + startingDate);
+ String path = zkRoot + "/" + ZNODE_PARTITIONS;
+ // truncate all existing partitions
+ if (_curator.checkExists().forPath(path) != null) {
+ _curator.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+
+ for (int i = 0; i < numTotalPartitions; i++) {
+ _curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
+ }
+ }
+
+ @Override
+ public String readProcessedDate(int partitionId) {
+ String path = zkRoot + "/partitions/" + partitionId;
+ try {
+ if (_curator.checkExists().forPath(path) != null) {
+ return new String(_curator.getData().forPath(path), "UTF-8");
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.error("fail read processed date", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void updateProcessedDate(int partitionId, String date) {
+ String path = zkRoot + "/partitions/" + partitionId;
+ try {
+ if (_curator.checkExists().forPath(path) == null) {
+ _curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, date.getBytes("UTF-8"));
+ } else {
+ _curator.setData().forPath(path, date.getBytes("UTF-8"));
+ }
+ } catch (Exception e) {
+ LOG.error("fail update processed date", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void addProcessedJob(String date, String jobId) {
+ String path = zkRoot + "/jobs/" + date + "/" + jobId;
+ try {
+ if (_curator.checkExists().forPath(path) == null) {
+ _curator.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
+ } else {
+ _curator.setData().forPath(path);
+ }
+ } catch (Exception e) {
+ LOG.error("fail adding processed jobs", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void truncateProcessedJob(String date) {
+ LOG.info("trying to truncate all data for day " + date);
+ // we need lock before we do truncate
+ String path = zkRoot + "/jobs/" + date;
+ InterProcessMutex lock = new InterProcessMutex(_curator, path);
+ try {
+ lock.acquire();
+ if (_curator.checkExists().forPath(path) != null) {
+ _curator.delete().deletingChildrenIfNeeded().forPath(path);
+ LOG.info("really truncated all data for day " + date);
+ }
+ } catch (Exception e) {
+ LOG.error("fail truncating processed jobs", e);
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ lock.release();
+ } catch (Exception e) {
+ LOG.error("fail releasing lock", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public List<String> readProcessedJobs(String date) {
+ String path = zkRoot + "/jobs/" + date;
+ try {
+ if (_curator.checkExists().forPath(path) != null) {
+ return _curator.getChildren().forPath(path);
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.error("fail read processed jobs", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void truncateEverything() {
+ String path = zkRoot;
+ try {
+ if (_curator.checkExists().forPath(path) != null) {
+ _curator.delete().deletingChildrenIfNeeded().forPath(path);
+ }
+ } catch (Exception ex) {
+ LOG.error("fail truncating verything", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
new file mode 100644
index 0000000..db62cfb
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobCounter.conf
@@ -0,0 +1,185 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#### Sample configuration:
+## counter.group0.name = groupname1
+## counter.group0.counter0.names = counterName1,counterName2,...
+## counter.group0.counter0.description = counter description...
+
+counter.group0.name = org.apache.hadoop.mapreduce.FileSystemCounter
+counter.group0.description = File System Counters
+counter.group0.counter0.names = FILE_BYTES_READ
+counter.group0.counter0.description = FILE: Number of bytes read
+counter.group0.counter1.names = FILE_BYTES_WRITTEN
+counter.group0.counter1.description = FILE: Number of bytes written
+counter.group0.counter2.names = FILE_READ_OPS
+counter.group0.counter2.description = FILE: Number of read operations
+counter.group0.counter3.names = FILE_LARGE_READ_OPS
+counter.group0.counter3.description = FILE: Number of large read operations
+counter.group0.counter4.names = FILE_WRITE_OPS
+counter.group0.counter4.description = FILE: Number of write operations
+counter.group0.counter5.names = HDFS_BYTES_READ
+counter.group0.counter5.description = HDFS: Number of bytes read
+counter.group0.counter6.names = HDFS_BYTES_WRITTEN
+counter.group0.counter6.description = HDFS: Number of bytes written
+counter.group0.counter7.names = HDFS_READ_OPS
+counter.group0.counter7.description = HDFS: Number of read operations
+counter.group0.counter8.names = HDFS_LARGE_READ_OPS
+counter.group0.counter8.description = HDFS: Number of large read operations
+counter.group0.counter9.names = HDFS_WRITE_OPS
+counter.group0.counter9.description = HDFS: Number of write operations
+
+counter.group1.name = org.apache.hadoop.mapreduce.TaskCounter
+counter.group1.description = Map-Reduce Framework
+counter.group1.counter0.names = MAP_INPUT_RECORDS
+counter.group1.counter0.description = Map input records
+counter.group1.counter1.names = MAP_OUTPUT_RECORDS
+counter.group1.counter1.description = Map output records
+counter.group1.counter2.names = SPLIT_RAW_BYTES
+counter.group1.counter2.description = Input split bytes
+counter.group1.counter3.names = SPILLED_RECORDS
+counter.group1.counter3.description = Spilled Records
+counter.group1.counter4.names = CPU_MILLISECONDS
+counter.group1.counter4.description = CPU time spent (ms)
+counter.group1.counter5.names = PHYSICAL_MEMORY_BYTES
+counter.group1.counter5.description = Physical memory (bytes) snapshot
+counter.group1.counter6.names = VIRTUAL_MEMORY_BYTES
+counter.group1.counter6.description = Virtual memory (bytes) snapshot
+counter.group1.counter7.names = COMMITTED_HEAP_BYTES
+counter.group1.counter7.description = Total committed heap usage (bytes)
+counter.group1.counter8.names = REDUCE_SHUFFLE_BYTES
+counter.group1.counter8.description = Reduce shuffle bytes (bytes)
+counter.group1.counter9.names = GC_TIME_MILLIS
+counter.group1.counter9.description = GC time milliseconds
+counter.group1.counter10.names = MAP_OUTPUT_BYTES
+counter.group1.counter10.description = map output bytes
+counter.group1.counter11.names = REDUCE_INPUT_RECORDS
+counter.group1.counter11.description = reduce input records
+counter.group1.counter12.names = COMBINE_INPUT_RECORDS
+counter.group1.counter12.description = combine input records
+counter.group1.counter13.names = COMBINE_OUTPUT_RECORDS
+counter.group1.counter13.description = combine output records
+counter.group1.counter14.names = REDUCE_INPUT_GROUPS
+counter.group1.counter14.description = reduce input groups
+counter.group1.counter15.names = REDUCE_OUTPUT_RECORDS
+counter.group1.counter15.description = reduce output records
+counter.group1.counter16.names = SHUFFLED_MAPS
+counter.group1.counter16.description = shuffled maps
+counter.group1.counter17.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group1.counter17.description = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group1.counter18.names = MERGED_MAP_OUTPUTS
+counter.group1.counter18.description = MERGED_MAP_OUTPUTS
+counter.group1.counter19.names = FAILED_SHUFFLE
+counter.group1.counter19.description = FAILED_SHUFFLE
+
+counter.group2.name = org.apache.hadoop.mapreduce.JobCounter
+counter.group2.description = Map-Reduce Job Counter
+counter.group2.counter0.names = MB_MILLIS_MAPS
+counter.group2.counter0.description = Total megabyte-seconds taken by all map tasks
+counter.group2.counter1.names = MB_MILLIS_REDUCES
+counter.group2.counter1.description = Total megabyte-seconds taken by all reduce tasks
+counter.group2.counter2.names = VCORES_MILLIS_MAPS
+counter.group2.counter2.description = Total vcore-seconds taken by all map tasks
+counter.group2.counter3.names = VCORES_MILLIS_REDUCES
+counter.group2.counter3.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter4.names = OTHER_LOCAL_MAPS
+counter.group2.counter4.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter5.names = DATA_LOCAL_MAPS
+counter.group2.counter5.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter6.names = MILLIS_MAPS
+counter.group2.counter6.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter7.names = MILLIS_REDUCES
+counter.group2.counter7.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter8.names = TOTAL_LAUNCHED_MAPS
+counter.group2.counter8.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter9.names = TOTAL_LAUNCHED_REDUCES
+counter.group2.counter9.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter10.names = SLOTS_MILLIS_MAPS
+counter.group2.counter10.description = Total vcore-seconds taken by all reduce tasks
+counter.group2.counter11.names = SLOTS_MILLIS_REDUCES
+counter.group2.counter11.description = Total vcore-seconds taken by all reduce tasks
+
+counter.group3.name = MapTaskAttemptCounter
+counter.group3.description = Reduce Task Attempt Counter Aggregation
+counter.group3.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group3.counter1.names = MAP_INPUT_RECORDS
+counter.group3.counter2.names = MERGED_MAP_OUTPUTS
+counter.group3.counter3.names = SPILLED_RECORDS
+counter.group3.counter4.names = MAP_OUTPUT_BYTES
+counter.group3.counter5.names = COMMITTED_HEAP_BYTES
+counter.group3.counter6.names = FAILED_SHUFFLE
+counter.group3.counter7.names = CPU_MILLISECONDS
+counter.group3.counter8.names = SPLIT_RAW_BYTES
+counter.group3.counter9.names = COMBINE_INPUT_RECORDS
+counter.group3.counter10.names = PHYSICAL_MEMORY_BYTES
+counter.group3.counter11.names = TASK_ATTEMPT_DURATION
+counter.group3.counter12.names = VIRTUAL_MEMORY_BYTES
+counter.group3.counter13.names = MAP_OUTPUT_RECORDS
+counter.group3.counter14.names = GC_TIME_MILLIS
+counter.group3.counter15.names = COMBINE_OUTPUT_RECORDS
+counter.group3.counter16.names = REDUCE_INPUT_GROUPS
+counter.group3.counter17.names = REDUCE_INPUT_RECORDS
+counter.group3.counter18.names = REDUCE_OUTPUT_RECORDS
+counter.group3.counter19.names = REDUCE_SHUFFLE_BYTES
+counter.group3.counter20.names = SHUFFLED_MAPS
+
+counter.group4.name = ReduceTaskAttemptCounter
+counter.group4.description = Reduce Task Attempt Counter Aggregation
+counter.group4.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group4.counter1.names = MAP_INPUT_RECORDS
+counter.group4.counter2.names = MERGED_MAP_OUTPUTS
+counter.group4.counter3.names = SPILLED_RECORDS
+counter.group4.counter4.names = MAP_OUTPUT_BYTES
+counter.group4.counter5.names = COMMITTED_HEAP_BYTES
+counter.group4.counter6.names = FAILED_SHUFFLE
+counter.group4.counter7.names = CPU_MILLISECONDS
+counter.group4.counter8.names = SPLIT_RAW_BYTES
+counter.group4.counter9.names = COMBINE_INPUT_RECORDS
+counter.group4.counter10.names = PHYSICAL_MEMORY_BYTES
+counter.group4.counter11.names = TASK_ATTEMPT_DURATION
+counter.group4.counter12.names = VIRTUAL_MEMORY_BYTES
+counter.group4.counter13.names = MAP_OUTPUT_RECORDS
+counter.group4.counter14.names = GC_TIME_MILLIS
+counter.group4.counter15.names = COMBINE_OUTPUT_RECORDS
+counter.group4.counter16.names = REDUCE_INPUT_GROUPS
+counter.group4.counter17.names = REDUCE_INPUT_RECORDS
+counter.group4.counter18.names = REDUCE_OUTPUT_RECORDS
+counter.group4.counter19.names = REDUCE_SHUFFLE_BYTES
+counter.group4.counter20.names = SHUFFLED_MAPS
+
+counter.group5.name = MapTaskAttemptFileSystemCounter
+counter.group5.description = Map Task Attempt File System Counter Aggregation
+counter.group5.counter0.names = FILE_READ_OPS
+counter.group5.counter1.names = FILE_WRITE_OPS
+counter.group5.counter2.names = FILE_BYTES_READ
+counter.group5.counter3.names = FILE_LARGE_READ_OPS
+counter.group5.counter4.names = HDFS_BYTES_READ
+counter.group5.counter5.names = FILE_BYTES_WRITTEN
+counter.group5.counter6.names = HDFS_LARGE_READ_OPS
+counter.group5.counter7.names = HDFS_BYTES_WRITTEN
+counter.group5.counter8.names = HDFS_READ_OPS
+
+counter.group6.name = ReduceTaskAttemptFileSystemCounter
+counter.group6.description = Reduce Task Attempt File System Counter Aggregation
+counter.group6.description = Map-Reduce Job Counter
+counter.group6.counter0.names = FILE_READ_OPS
+counter.group6.counter1.names = FILE_WRITE_OPS
+counter.group6.counter2.names = FILE_BYTES_READ
+counter.group6.counter3.names = FILE_LARGE_READ_OPS
+counter.group6.counter4.names = HDFS_BYTES_READ
+counter.group6.counter5.names = FILE_BYTES_WRITTEN
+counter.group6.counter6.names = HDFS_LARGE_READ_OPS
+counter.group6.counter7.names = HDFS_BYTES_WRITTEN
+counter.group6.counter8.names = HDFS_READ_OPS
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config
new file mode 100644
index 0000000..ee8c0c5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/MRErrorCategory.config
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ATTEMPT_TIMEOUT = false|AttemptID:\S+ Timed out after \d+ secs
+INTERFACE_EXPECTED = false|but interface was expected
+FILE_NOT_EXIST = false|File does not exist
+HIVE_BAD_SCHEMA_EXCEPTION = false|Caused by: org.apache.hadoop.hive.serde2.avro.BadSchemaException
+CONNECTION_RESET_BY_PEER = false|Connection reset by peer
+CONTAINER_KILLED_BY_APPMASTER = false|Container killed by the ApplicationMaster
+CONTAINER_CLEANUP_FAILURE = false|cleanup failed for container container_
+USER_NOT_FOUND = false|User \S+ not found
+TASK_TREE_BEYOND_MEMORY_LIMIT = false|^TaskTree \S+ is running beyond memory-limits
+EBAY_APPMON_LOG_GET_TRAVERSER = false|^Error: com.ebay.appmon.log.traverser.LogTraverser.getTraverser
+MAP_OUTPUT_LOST = false|Map output lost
+BEYOND_PHYSICAL_MEMORY_LIMITS = false|running beyond physical memory limits
+GC_OVERHEAD_LIMIT_EXCEEDED = false|GC overhead limit exceeded
+NO_SPACE_LEFT = false|No space left
+MKDIR_FAILURE = false|mkdir of file:\S+ failed
+KILLED_CLEAN_BY_USER = false|Task has been KILLED by the user
+KILLED_UNCLEAN_BY_USER = false|Task has been KILLED_UNCLEAN by the user
+FAILED_TO_REPORT_STATUS = false|failed to report status for \d+ seconds. Killing
+EXCEPTION_FROM_CONTAINER_LAUNCH = false|^Exception from container-launch
+LOST_TASK_TRACKER = false|^Lost task tracker
+TOO_MANY_FETCH_FAILURES = false|^Too many fetch-failures$
+JAVA_HEAP_SPACE = false|$Error: Java heap space$
+JAVA_EXCEPTION = true|^(?:error: Error: |Error: )?(\S+Exception|\S+Error)
+JAVA_THROWABLE = false|^(?:error: Error: |Error: )?java.lang.Throwable
+NO_DETAIL = false|^$
+UNKNOWN = false|.*
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
new file mode 100644
index 0000000..8cb1aa3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "envContextConfig" : {
+ "env" : "local",
+ "topologyName" : "mr_history",
+ "stormConfigFile" : "storm.yaml",
+ "parallelismConfig" : {
+ "mrHistoryJobExecutor" : 6
+ },
+ "tasks" : {
+ "mrHistoryJobExecutor" : 6
+ },
+ "workers" : 3
+ },
+
+ "jobExtractorConfig" : {
+ "site" : "sandbox",
+ "mrVersion": "MRVer2",
+ "readTimeOutSeconds" : 10
+ },
+
+ "dataSourceConfig" : {
+ "zkQuorum" : "sandbox.hortonworks.com:2181",
+ "zkPort" : "2181",
+ "zkRoot" : "/test_mrjobhistory",
+ "zkSessionTimeoutMs" : 15000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 20000,
+ "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
+ "principal":"", #if not need, then empty
+ "keytab":"",
+ "basePath" : "/mr-history/done",
+ "pathContainsJobTrackerName" : false,
+ "jobTrackerName" : "",
+ "zeroBasedMonth" : false,
+ "dryRun" : false,
+ "partitionerCls" : "org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner",
+ "timeZone" : "UTC"
+ },
+
+ "eagleProps" : {
+ "mailHost" : "abc.com",
+ "mailDebug" : "true",
+ "eagleService": {
+ "host": "sandbox.hortonworks.com",
+ "port": 9099,
+ "username": "admin",
+ "password": "secret"
+ }
+ },
+
+ "MRConfigureKeys" : [
+ "mapreduce.map.output.compress",
+ "mapreduce.map.output.compress.codec",
+ "mapreduce.output.fileoutputformat.compress",
+ "mapreduce.output.fileoutputformat.compress.type",
+ "mapreduce.output.fileoutputformat.compress.codec",
+ "mapred.output.format.class",
+ "eagle.job.runid",
+ "eagle.job.runidfieldname",
+ "eagle.job.name",
+ "eagle.job.normalizedfieldname",
+ "eagle.alert.email",
+ "eagle.job.alertemailaddress",
+ "dataplatform.etl.info",
+ "mapreduce.map.memory.mb",
+ "mapreduce.reduce.memory.mb",
+ "mapreduce.map.java.opts",
+ "mapreduce.reduce.java.opts"
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml
new file mode 100644
index 0000000..11e8486
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/core-site.xml
@@ -0,0 +1,497 @@
+<?xml version="1.0"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with ~
+ this work for additional information regarding copyright ownership. ~ The
+ ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+ "License"); you may not use this file except in compliance with ~ the License.
+ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, software ~
+ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+ License for the specific language governing permissions and ~ limitations
+ under the License. -->
+
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
+
+<!-- i/o properties -->
+
+ <property>
+ <name>io.file.buffer.size</name>
+ <value>131072</value>
+ <description>The size of buffer for use in sequence files.
+ The size of this buffer should probably be a multiple of hardware
+ page size (4096 on Intel x86), and it determines how much data is
+ buffered during read and write operations.</description>
+ </property>
+
+<property>
+ <description>If users connect through a SOCKS proxy, we don't
+ want their SocketFactory settings interfering with the socket
+ factory associated with the actual daemons.</description>
+ <name>hadoop.rpc.socket.factory.class.default</name>
+ <value>org.apache.hadoop.net.StandardSocketFactory</value>
+</property>
+
+<property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop/hadoop-${user.name}</value>
+ <description>A base for other temporary directories.</description>
+</property>
+
+<property>
+ <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
+ <value></value>
+</property>
+
+<property>
+ <name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name>
+ <value></value>
+</property>
+
+ <property>
+ <name>io.serializations</name>
+ <value>org.apache.hadoop.io.serializer.WritableSerialization</value>
+ </property>
+
+ <property>
+ <name>io.compression.codecs</name>
+ <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec</value>
+ </property>
+
+ <!-- LZO: see http://www.facebook.com/notes/cloudera/hadoop-at-twitter-part-1-splittable-lzo-compression/178581952002 -->
+ <property>
+ <name>io.compression.codec.lzo.class</name>
+ <value>com.hadoop.compression.lzo.LzoCodec</value>
+ </property>
+
+
+<!-- file system properties -->
+
+ <property>
+ <name>fs.defaultFS</name>
+ <!-- cluster variant -->
+ <value>hdfs://apollo-phx-nn-ha</value>
+ <description>The name of the default file system. Either the
+ literal string "local" or a host:port for NDFS.</description>
+ <final>true</final>
+ </property>
+
+ <property>
+ <description>Topology script</description>
+ <name>net.topology.script.file.name</name>
+ <value>/apache/hadoop/etc/hadoop/topology</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>fs.trash.interval</name>
+ <value>480</value>
+ <description>Number of minutes between trash checkpoints.
+ If zero, the trash feature is disabled.
+ </description>
+ </property>
+
+ <!-- mobius-proxyagent impersonation configurations -->
+<property>
+ <name>hadoop.proxyuser.mobius-proxyagent.groups</name>
+ <value>hdmi-mm,hdmi-set,hdmi-research,hdmi-technology,hdmi-hadoopeng,hdmi-cs,hdmi-milo,hdmi-appdev,hdmi-siteanalytics,hdmi-prod,hdmi-others,hdmi-sdc,hdmi-finance,hdmi-est,hdmi-cci,hdmi-mptna,hdmi-xcom,hdmi-stu,hdmi-mobile</value>
+ <description>Allow user mobius-proxyagent to impersonate any members of the groups </description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.mobius-proxyagent.hosts</name>
+ <value>10.114.118.13,10.115.201.53</value>
+ <description>The mobius-proxyagent can connect from hosts to impersonate a user</description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.bridge_adm.groups</name>
+ <value>hdmi-mm,hdmi-set,hdmi-research,hdmi-technology,hdmi-hadoopeng,hdmi-cs,hdmi-milo,hdmi-appdev,hdmi-siteanalytics,hdmi-prod,hdmi-others,hdmi-sdc,hdmi-finance,hdmi-est,hdmi-cci,hdmi-mptna,hdmi-xcom,hdmi-stu,hdmi-mobile</value>
+ <description>Allow user bridge_adm (Teradata-Hadoop bridge) to impersonate any members of the groups </description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.bridge_adm.hosts</name>
+ <value>10.103.47.11,10.103.47.12,10.103.47.13,10.103.47.14,10.103.47.15,10.103.47.16,10.103.47.17,10.103.47.18,10.103.47.19,10.103.47.20,10.103.47.21,10.103.47.22,10.103.48.11,10.103.48.12,10.103.48.13,10.103.48.14,10.103.48.15,10.103.48.16,10.103.48.17,10.103.48.18,10.103.48.19,10.103.48.20,10.103.48.21,10.103.48.22,10.103.88.11,10.103.88.12,10.103.88.13,10.103.88.14,10.103.88.15,10.103.88.16,10.103.88.17,10.103.88.18,10.103.88.19,10.103.88.20,10.103.88.21,10.103.88.22,10.103.88.23,10.103.88.24,10.103.88.25,10.103.88.26,10.103.88.27,10.103.88.28,10.103.88.29,10.103.88.30,10.103.88.31,10.103.88.32,10.103.88.33,10.103.88.34,10.103.89.11,10.103.89.12,10.103.89.13,10.103.89.14,10.103.89.15,10.103.89.16,10.103.89.17,10.103.89.18,10.103.89.19,10.103.89.20,10.103.89.21,10.103.89.22,10.103.89.23,10.103.89.24,10.103.89.25,10.103.89.26,10.103.89.27,10.103.89.28,10.103.89.29,10.103.89.30,10.103.89.31,10.103.89.32,10.103.89.33,10.103.89.34,10.115.37.50,10.115.37.51,10.115.37.52,10.115.37.5
3,10.115.38.50,10.115.38.51,10.115.38.52,10.115.38.53,10.115.208.11,10.115.208.12,10.115.208.13,10.115.208.14,10.115.208.15,10.115.208.16,10.115.208.17,10.115.208.18,10.115.208.19,10.115.208.20,10.115.208.21,10.115.208.22,10.115.208.23,10.115.208.24,10.115.208.25,10.115.208.26,10.103.158.101,10.103.158.102,10.103.158.103,10.103.158.104,10.103.158.105,10.103.158.106,10.103.158.107,10.103.158.108,10.103.158.109,10.103.158.110,10.103.158.111,10.103.158.112,10.103.158.113,10.103.158.114,10.103.158.115,10.103.158.116</value>
+ <description>The bridge_adm user (Teradata-Hadoop bridge) can connect from hosts to impersonate a user</description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.hadoop.hosts</name>
+ <value>*</value>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.hadoop.groups</name>
+ <value>*</value>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.sg_adm.groups</name>
+ <value>hdmi-etl</value>
+ <description>Allow user sg_adm (HDMIT-4462) to impersonate any members of the groups </description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.sg_adm.hosts</name>
+ <value>*</value>
+ <description>The sg_adm user (HDMIT-4462) can connect from hosts to impersonate a user</description>
+</property>
+
+ <property>
+ <name>fs.inmemory.size.mb</name>
+ <value>256</value>
+ </property>
+
+ <!-- ipc properties: copied from kryptonite configuration -->
+ <property>
+ <name>ipc.client.idlethreshold</name>
+ <value>8000</value>
+ <description>Defines the threshold number of connections after which
+ connections will be inspected for idleness.
+ </description>
+ </property>
+
+ <property>
+ <name>ipc.client.connection.maxidletime</name>
+ <value>30000</value>
+ <description>The maximum time after which a client will bring down the
+ connection to the server.
+ </description>
+ </property>
+
+ <property>
+ <name>ipc.client.connect.max.retries</name>
+ <value>50</value>
+ <description>Defines the maximum number of retries for IPC connections.</description>
+ </property>
+
+ <!-- Web Interface Configuration -->
+ <property>
+ <name>webinterface.private.actions</name>
+ <value>false</value>
+ <description> If set to true, the web interfaces of JT and NN may contain
+ actions, such as kill job, delete file, etc., that should
+ not be exposed to public. Enable this option if the interfaces
+ are only reachable by those who have the right authorization.
+ </description>
+ </property>
+
+<property>
+ <name>hadoop.proxyuser.hive.groups</name>
+ <value>*</value>
+ <description>
+ Proxy group for Hadoop.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.hive.hosts</name>
+ <value>*</value>
+ <description>
+ Proxy host for Hadoop.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.oozie.groups</name>
+ <value>*</value>
+ <description>
+ Proxy group for Hadoop.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.proxyuser.oozie.hosts</name>
+ <value>phxaishdc9en0007-be.phx.ebay.com</value>
+ <description>
+ Proxy host for Hadoop.
+ </description>
+</property>
+
+<!-- BEGIN security configuration -->
+ <property>
+ <name>hadoop.security.authentication</name>
+ <value>kerberos</value>
+ <!-- A value of "simple" would disable security. -->
+ </property>
+
+ <property>
+ <name>hadoop.security.authorization</name>
+ <value>true</value>
+ </property>
+
+ <!-- Setting to ShellBasedUnixGroupsMapping to override the default of
+ JniBasedUnixGroupsMappingWithFallback. See HWX case 00006991 -->
+ <property>
+ <name>hadoop.security.group.mapping</name>
+ <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.filter.initializers</name>
+ <value>org.apache.hadoop.security.AuthenticationFilterInitializer</value>
+ </property>
+
+<!-- BEGIN hadoop.http.authentication properties -->
+ <property>
+ <name>hadoop.http.authentication.type</name>
+ <value>org.apache.hadoop.security.authentication.server.CompositeAuthenticationHandler</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.token.validity</name>
+ <value>36000</value>
+ <!-- in seconds -->
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.signature.secret.file</name>
+ <value>/etc/hadoop/http_auth_secret</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.cookie.domain</name>
+ <value>ebay.com</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.pingFederate.config.file</name>
+ <value>/etc/hadoop/pingfederate-agent-config.txt</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.pingFederate.url</name>
+ <value>https://sso.corp.ebay.com/sp/startSSO.ping?PartnerIdpId=eBayHadoop</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.pingFederate.anonymous.allowed</name>
+ <value>true</value>
+ </property>
+
+<!-- BEGIN properties enabled per HDP-2.1.3 upgrade -->
+
+ <property>
+ <name>hadoop.http.authentication.composite.handlers</name>
+ <value>org.apache.hadoop.security.authentication.server.PingFederateAuthenticationHandler,kerberos,anonymous</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.composite.default-non-browser-handler-type</name>
+ <value>kerberos</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.kerberos.keytab</name>
+ <value>/etc/hadoop/hadoop.keytab</value>
+ </property>
+
+ <property>
+ <name>hadoop.http.authentication.kerberos.principal</name>
+ <value>*</value>
+ </property>
+
+<!-- END properties enabled per HDP-2.1.3 upgrade -->
+
+<!-- END hadoop.http.authentication properties -->
+
+
+ <property>
+ <name>hadoop.security.auth_to_local</name>
+ <value>
+ RULE:[1:$1]
+ RULE:[2:$1]
+ DEFAULT
+ </value>
+ </property>
+
+ <property>
+ <name>kerberos.multiplerealm.supported</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>kerberos.multiplerealm.realms</name>
+ <value>CORP.EBAY.COM</value>
+ </property>
+
+<!--SSL SUPPORT -->
+
+<property>
+ <name>hadoop.ssl.keystores.factory.class</name>
+ <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
+ <description>
+ The keystores factory to use for retrieving certificates.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.ssl.require.client.cert</name>
+ <value>false</value>
+ <description>Whether client certificates are required</description>
+</property>
+
+<property>
+ <name>hadoop.ssl.hostname.verifier</name>
+ <value>ALLOW_ALL</value>
+ <description>
+ The hostname verifier to provide for HttpsURLConnections.
+ Valid values are: DEFAULT, STRICT, STRICT_I6, DEFAULT_AND_LOCALHOST and
+ ALLOW_ALL
+ </description>
+</property>
+
+<property>
+ <name>hadoop.ssl.server.conf</name>
+ <value>ssl-server.xml</value>
+ <description>
+ Resource file from which ssl server keystore information will be extracted.
+ This file is looked up in the classpath, typically it should be in Hadoop
+ conf/ directory.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.ssl.client.conf</name>
+ <value>ssl-client.xml</value>
+ <description>
+ Resource file from which ssl client keystore information will be extracted
+ This file is looked up in the classpath, typically it should be in Hadoop
+ conf/ directory.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.ssl.enabled</name>
+ <value>false</value>
+ <description>
+ Whether to use SSL for the HTTP endpoints. If set to true, the
+ NameNode, DataNode, ResourceManager, NodeManager, HistoryServer and
+ MapReduceAppMaster web UIs will be served over HTTPS instead HTTP.
+ </description>
+</property>
+
+<!-- User Group Resolution -->
+
+<property>
+ <name>hadoop.security.groups.cache.secs</name>
+ <value>3600</value>
+</property>
+
+<!-- END security configuration -->
+
+
+
+<!-- BEGIN properties enabled per HDP-2.1.3 upgrade -->
+
+<!-- BEGIN Quality of Service -->
+
+ <property>
+ <name>ipc.8020.callqueue.impl</name>
+ <value>com.ebay.hadoop.ipc.FairCallQueue</value>
+ </property>
+
+ <property>
+ <name>ipc.8020.identity-provider.impl</name>
+ <value>com.ebay.hadoop.ipc.EbayUserIdentityProvider</value>
+ </property>
+
+ <property>
+ <name>ipc.8020.faircallqueue.rpc-scheduler</name>
+ <value>com.ebay.hadoop.ipc.DecayRpcScheduler</value>
+ </property>
+
+ <property>
+ <name>ipc.8020.faircallqueue.priority-levels</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <name>ipc.8020.faircallqueue.decay-scheduler.thresholds</name>
+ <!-- <value>1,2,7,10,20,30,40,50,60</value> -->
+ <value>1,2,3,5,8,13,20,35,50</value>
+ </property>
+
+ <property>
+ <name>ipc.8020.faircallqueue.decay-scheduler.period-ms</name>
+ <value>1000</value>
+ </property>
+
+ <property>
+ <name>ipc.8020.faircallqueue.multiplexer.weights</name>
+ <!-- <value>10,5,3,2,1,1,1,1,1,1</value> -->
+ <value>80,30,25,20,17,12,6,3,2,1</value>
+ </property>
+
+<!-- END Quality of Service -->
+
+
+
+<!-- BEGIN Selective Encryption -->
+<!-- disabled per HADP-6065 - miguenther - 26 August 2014
+ <property>
+ <name>hadoop.rpc.protection</name>
+ <value>authentication,privacy</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hadoop.security.saslproperties.resolver.class</name>
+ <value>org.apache.hadoop.security.WhitelistBasedResolver</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hadoop.security.sasl.variablewhitelist.enable</name>
+ <value>true</value>
+ <final>true</final>
+ </property>
+-->
+<!-- END Selective Encryption -->
+
+
+<!-- END properties enabled per HDP-2.1.3 upgrade -->
+
+<property>
+ <name>ha.zookeeper.quorum</name>
+ <value>apollo-phx-zk-1.vip.ebay.com:2181,apollo-phx-zk-2.vip.ebay.com:2181,apollo-phx-zk-3.vip.ebay.com:2181,apollo-phx-zk-4.vip.ebay.com:2181,apollo-phx-zk-5.vip.ebay.com:2181</value>
+</property>
+
+<!-- NEW QOP Proposed configs below - Same as Ares Tiffany Sept 01, 2015 -->
+<property>
+ <name>hadoop.rpc.protection</name>
+ <value>authentication,privacy</value>
+</property>
+
+ <property>
+ <name>hadoop.security.saslproperties.resolver.class</name>
+ <value>org.apache.hadoop.security.WhitelistBasedResolver</value>
+ </property>
+
+ <property>
+ <name>hadoop.security.sasl.fixedwhitelist.file</name>
+ <value>/etc/hadoop/fixedwhitelist</value>
+ </property>
+
+ <property>
+ <name>hadoop.security.sasl.variablewhitelist.enable</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hadoop.security.sasl.variablewhitelist.file</name>
+ <value>/etc/hadoop/whitelist</value>
+ </property>
+
+ <property>
+ <name>hadoop.security.sasl.variablewhitelist.cache.secs</name>
+ <value>3600</value>
+ </property>
+<!-- END NEW QOP Proposed configs below - Same as Ares Tiffany Sept 01, 2015 -->
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml
new file mode 100644
index 0000000..52ba754
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/hdfs-site.xml
@@ -0,0 +1,449 @@
+<?xml version="1.0"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with ~
+ this work for additional information regarding copyright ownership. ~ The
+ ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+ "License"); you may not use this file except in compliance with ~ the License.
+ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, software ~
+ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+ License for the specific language governing permissions and ~ limitations
+ under the License. -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <!-- The directories for NN, DN and SNN configs -->
+
+ <property>
+ <name>dfs.namenode.name.dir</name>
+ <value>/hadoop/nn1/1</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>dfs.datanode.data.dir</name>
+ <value>/hadoop/1/data,/hadoop/2/data,/hadoop/3/data,/hadoop/4/data,/hadoop/5/data,/hadoop/6/data,/hadoop/7/data,/hadoop/8/data,/hadoop/9/data,/hadoop/10/data,/hadoop/11/data,/hadoop/12/data</value>
+ </property>
+
+ <property>
+ <name>dfs.blockreport.initialDelay</name>
+ <value>900</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.decommission.interval</name>
+ <value>150</value>
+ </property>
+
+ <!-- The Nodes include and exclude -->
+
+ <property>
+ <name>dfs.hosts</name>
+ <!-- The files containing hosts allowed to connect to namenode -->
+ <value>/apache/hadoop/etc/hadoop/hosts</value>
+ </property>
+
+ <property>
+ <name>dfs.hosts.exclude</name>
+ <!-- The files containing hosts allowed to connect to namenode -->
+ <value>/apache/hadoop/etc/hadoop/hdfs-exclude</value>
+ </property>
+
+
+ <property>
+ <name>dfs.datanode.failed.volumes.tolerated</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <name>dfs.datanode.balance.bandwidthPerSec</name>
+ <value>10485760</value>
+ </property>
+
+ <property>
+ <!-- Amount of space which HDFS will refuse to use in bytes -->
+ <name>dfs.datanode.du.reserved</name>
+ <value>107374182400</value> <!-- 100 GB-->
+ </property>
+
+ <!-- RMERCHIA AISOPS159160 2012-09-25 -->
+
+ <property>
+ <name>dfs.heartbeat.interval</name>
+ <value>6</value>
+ <description>how frequently dn send a heartbeat.</description>
+ </property>
+
+ <!-- RMERCHIA AISOPS159160 2012-09-25 change to 6 hours on 2012-10-02 -->
+
+ <property>
+ <name>dfs.blockreport.intervalMsec</name>
+ <value>21600000</value>
+ <description>how frequently dn send a blockreport.</description>
+ </property>
+
+ <property>
+ <name>dfs.namenode.safemode.threshold-pct</name>
+ <value>1.0f</value>
+ <!-- Allows 10 blocks unreported out of 10,000,000 -->
+ <description>
+ Specifies the percentage of blocks that should satisfy
+ the minimal replication requirement defined by dfs.replication.min.
+ Values less than or equal to 0 mean not to start in safe mode.
+ Values greater than 1 will make safe mode permanent.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.namenode.safemode.extension</name>
+ <value>120000</value>
+ <!-- 2 minutes -->
+ <description> Determines extension of safe mode in milliseconds after the threshold level is reached. </description>
+ </property>
+
+ <property>
+ <name>dfs.permissions.enabled</name>
+ <value>true</value>
+ <description>
+ If "true", enable permission checking in HDFS.
+ If "false", permission checking is turned off,
+ but all other behavior is unchanged.
+ Switching from one parameter value to the other does not change the mode,
+ owner or group of files or directories.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.replication</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <name>dfs.blocksize</name>
+ <!-- 128mb (default 64m or 67108864) -->
+ <value>268435456</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.handler.count</name>
+ <value>128</value>
+ </property>
+
+ <property>
+ <name>dfs.datanode.handler.count</name>
+ <value>50</value>
+ </property>
+
+ <!-- updated from 4k to 16k as part of HADP-6065 - miguenther - 26 august 2014 -->
+ <property>
+ <name>dfs.datanode.max.transfer.threads</name>
+ <value>16384</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.replication.max-streams</name>
+ <value>40</value>
+ </property>
+
+ <property>
+ <name>dfs.webhdfs.enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>dfs.block.local-path-access.user</name>
+ <value>hadoop</value>
+ <description>the user who is allowed to perform short circuit reads.</description>
+ </property>
+
+ <property>
+ <name>dfs.block.access.token.enable</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.name.dir.restore</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>dfs.ls.limit</name>
+ <value>4096</value>
+ </property>
+
+ <!-- NameNode security config -->
+ <property>
+ <name>dfs.web.authentication.kerberos.keytab</name>
+ <value>/etc/hadoop/hadoop.keytab</value>
+ </property>
+ <property>
+ <name>dfs.namenode.kerberos.internal.spnego.principal</name>
+ <value>*</value>
+ </property>
+ <property>
+ <name>dfs.namenode.keytab.file</name>
+ <value>/etc/hadoop/hadoop.keytab</value>
+ </property>
+ <property>
+ <name>dfs.namenode.kerberos.principal</name>
+ <value>hadoop/_HOST@APD.EBAY.COM</value>
+ <!-- _HOST will be replaced by the the domain name present in fs.default.name. It is better to use the actual host name -->
+ </property>
+ <property>
+ <name>dfs.web.authentication.kerberos.principal</name>
+ <value>HTTP/_HOST@APD.EBAY.COM,HTTP/apollo-hdfs.corp.ebay.com@CORP.EBAY.COM</value>
+ </property>
+
+ <!-- DataNode security config -->
+ <property>
+ <name>dfs.datanode.data.dir.perm</name>
+ <value>700</value>
+ </property>
+ <property>
+ <name>dfs.datanode.address</name>
+ <value>0.0.0.0:1004</value>
+ </property>
+ <property>
+ <name>dfs.datanode.http.address</name>
+ <value>0.0.0.0:1006</value>
+ </property>
+ <property>
+ <name>dfs.datanode.keytab.file</name>
+ <value>/etc/hadoop/hadoop.keytab</value>
+ </property>
+ <property>
+ <name>dfs.datanode.kerberos.principal</name>
+ <value>hadoop/_HOST@APD.EBAY.COM</value>
+ <!-- _HOST will be replaced by the frst domain name mapped to the ip -->
+ </property>
+
+ <property>
+ <name>dfs.cluster.administrators</name>
+ <value> hdmi-hadoopeng</value>
+ </property>
+
+ <!-- HTTPS SUPPORT -->
+
+ <property>
+ <name>dfs.https.need.client.auth</name>
+ <value>false</value>
+ <description>Whether SSL client certificate authentication is required
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.https.server.keystore.resource</name>
+ <value>ssl-server.xml</value>
+ <description>Resource file from which ssl server keystore
+ information will be extracted
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.https.client.keystore.resource</name>
+ <value>ssl-client.xml</value>
+ <description>Resource file from which ssl client keystore
+ information will be extracted
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.datanode.https.address</name>
+ <value>0.0.0.0:50075</value>
+ </property>
+
+ <property>
+ <name>dfs.datanode.http.address</name>
+ <value>0.0.0.0:1006</value>
+ </property>
+
+
+
+ <property>
+ <name>dfs.domain.socket.path</name>
+ <value>/var/run/hadoop-hdfs/dn</value>
+ </property>
+
+ <property>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
+ </property>
+
+
+ <property>
+ <name>dfs.namenode.service.handler.count</name>
+ <value>55</value>
+ </property>
+
+
+
+
+ <!-- BEGIN properties enabled per HDP-2.1.3 upgrade -->
+
+ <property>
+ <name>dfs.namenode.acls.enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>dfs.http.policy</name>
+ <value>HTTP_AND_HTTPS</value>
+ </property>
+
+ <property>
+ <name>dfs.web.authentication.filter</name>
+ <value>org.apache.hadoop.hdfs.web.TokenAuthFilter,authentication</value>
+ </property>
+
+ <!-- END properties enabled per HDP-2.1.3 upgrade -->
+
+
+ <!-- added as part of HAPD-6065 - miguenther 26 August 2014 -->
+ <property>
+ <name>ipc.server.read.threadpool.size</name>
+ <value>3</value>
+ </property>
+
+
+ <!-- Apollo PHX HA Configs -->
+ <property>
+ <name>dfs.nameservices</name>
+ <value>apollo-phx-nn-ha</value>
+ <description>Logical name for this new nameservice</description>
+ </property>
+
+ <property>
+ <name>dfs.ha.namenodes.apollo-phx-nn-ha</name>
+ <value>nn1,nn2</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.rpc-address.apollo-phx-nn-ha.nn1</name>
+ <value>apollo-phx-nn.vip.ebay.com:8020</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.rpc-address.apollo-phx-nn-ha.nn2</name>
+ <value>apollo-phx-nn-2.vip.ebay.com:8020</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.servicerpc-address.apollo-phx-nn-ha.nn1</name>
+ <value>apollo-phx-nn.vip.ebay.com:8030</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.servicerpc-address.apollo-phx-nn-ha.nn2</name>
+ <value>apollo-phx-nn-2.vip.ebay.com:8030</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.http-address.apollo-phx-nn-ha.nn1</name>
+ <value>apollo-phx-nn.vip.ebay.com:50080</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.http-address.apollo-phx-nn-ha.nn2</name>
+ <value>apollo-phx-nn-2.vip.ebay.com:50080</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.shared.edits.dir</name>
+ <value>qjournal://phxaishdc9en0010-be.phx.ebay.com:8485;phxaishdc9en0011-be.phx.ebay.com:8485;phxaishdc9en0012-be.phx.ebay.com:8485;phxaishdc9en0013-be.phx.ebay.com:8485;phxaishdc9en0014-be.phx.ebay.com:8485/apollo-phx-nn-ha</value>
+ </property>
+
+ <property>
+ <name>dfs.client.failover.proxy.provider.apollo-phx-nn-ha</name>
+ <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
+ </property>
+
+ <property>
+ <name>dfs.ha.fencing.methods</name>
+ <value>sshfence
+ shell(/bin/true)
+ </value>
+ </property>
+
+ <property>
+ <name>dfs.ha.fencing.ssh.private-key-files</name>
+ <value>/home/hadoop/.ssh/id_rsa</value>
+ </property>
+
+ <property>
+ <name>dfs.ha.fencing.ssh.connect-timeout</name>
+ <value>30000</value>
+ </property>
+
+ <property>
+ <name>dfs.ha.automatic-failover.enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>dfs.journalnode.edits.dir</name>
+ <value>/hadoop/qjm/apollo</value>
+ </property>
+
+ <property>
+ <name>dfs.journalnode.kerberos.principal</name>
+ <value>hadoop/_HOST@APD.EBAY.COM</value>
+ </property>
+
+ <property>
+ <name>dfs.journalnode.kerberos.internal.spnego.principal</name>
+ <value>HTTP/_HOST@APD.EBAY.COM</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.https-address.apollo-phx-nn-ha.nn2</name>
+ <value>apollo-phx-nn-2.vip.ebay.com:50070</value>
+ </property>
+
+ <property>
+ <name>dfs.namenode.https-address.apollo-phx-nn-ha.nn1</name>
+ <value>apollo-phx-nn.vip.ebay.com:50070</value>
+ </property>
+
+ <property>
+ <name>dfs.journalnode.keytab.file</name>
+ <value>/etc/hadoop/hadoop.keytab</value>
+ </property>
+
+ <!-- Apollo HA Configs END -->
+
+ <!-- BEGIN Selective Encryption as in Ares - Sept 01, 2015 Tiffany -->
+ <property>
+ <name>dfs.encrypt.data.transfer</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>dfs.encrypt.data.transfer.algorithm</name>
+ <value>rc4</value>
+ <final>true</final>
+ </property>
+ <property>
+ <name>dfs.trustedchannel.resolver.class</name>
+ <value>org.apache.hadoop.hdfs.datatransfer.FlagListTrustedChannelResolver</value>
+ <final>true</final>
+ </property>
+ <property>
+ <name>dfs.datatransfer.client.encrypt</name>
+ <value>false</value>
+ <final>true</final>
+ </property>
+
+ <!-- END Selective Encryption as in Ares - Sept 01, 2015 Tiffany -->
+
+ <!-- Post Upgrade - improve performance - Oct 23, 2015 Tiffany -->
+ <property>
+ <name>dfs.client.block.write.locateFollowingBlock.retries</name>
+ <value>8</value>
+ </property>
+ <!-- END Post Upgrade - improve performance - Oct 23, 2015 Tiffany -->
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties
new file mode 100644
index 0000000..71a5dac
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/log4j.properties
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, DRFA, stdout
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file