You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/08/24 07:49:11 UTC
[1/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app
with new app framework
Repository: incubator-eagle
Updated Branches:
refs/heads/develop b52405f12 -> 0bde482be
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 3daae37..ca4a94f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,19 +18,20 @@
package org.apache.eagle.jpm.mr.history.storm;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.history.crawler.*;
-import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
import org.apache.eagle.jpm.util.JobIdFilter;
import org.apache.eagle.jpm.util.JobIdFilterByPartition;
import org.apache.eagle.jpm.util.JobIdPartitioner;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,28 +44,30 @@ import java.util.Map;
/**
* Zookeeper znode structure
* -zkRoot
- * - partitions
- * - 0 (20150101)
- * - 1 (20150101)
- * - 2 (20150101)
- * - ... ...
- * - N-1 (20150102)
- * - jobs
- * - 20150101
- * - job1
- * - job2
- * - job3
- * - 20150102
- * - job1
- * - job2
- * - job3
- *
+ * - partitions
+ * - 0 (20150101)
+ * - 1 (20150101)
+ * - 2 (20150101)
+ * - ... ...
+ * - N-1 (20150102)
+ * - jobs
+ * - 20150101
+ * - job1
+ * - job2
+ * - job3
+ * - 20150102
+ * - job1
+ * - job2
+ * - job3
+ * <p>
* Spout can have multiple instances, which is supported by storm parallelism primitive.
- *
+ * </p>
+ * <p>
* Under znode partitions, N child znodes (name is 0 based integer) would be created with each znode mapped to one spout instance. All jobs will be partitioned into N
* partitions by applying JobPartitioner class to each job Id. The value of each partition znode is the date when the last job in this partition
* is successfully processed.
- *
+ * </p>
+ * <p>
* processing steps
* 1) In constructor,
* 2) In open(), calculate jobPartitionId for current spout (which should be exactly same to spout taskId within TopologyContext)
@@ -74,10 +77,9 @@ import java.util.Map;
* 7) process job files (job history file and job configuration xml file)
* 8) add job Id to current date slot say for example 20150102 after this job is successfully processed
* 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should be configurable)
- *
+ * </p>
* Note:
* if one spout instance crashes and is brought up again, open() method would be invoked again, we need think of this scenario.
- *
*/
public class JobHistorySpout extends BaseRichSpout {
@@ -90,20 +92,18 @@ public class JobHistorySpout extends BaseRichSpout {
private JobHistoryContentFilter contentFilter;
private JobHistorySpoutCollectorInterceptor interceptor;
private JHFInputStreamCallback callback;
- private JHFConfigManager configManager;
- private JobHistoryLCM m_jhfLCM;
- private final static int MAX_RETRY_TIMES = 3;
+ private MRHistoryJobConfig configManager;
+ private JobHistoryLCM jhfLCM;
+ private static final int MAX_RETRY_TIMES = 3;
- public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) {
+ public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager) {
this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
}
/**
- * mostly this constructor signature is for unit test purpose as you can put customized interceptor here
- * @param filter
- * @param adaptor
+ * mostly this constructor signature is for unit test purpose as you can put customized interceptor here.
*/
- public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager, JobHistorySpoutCollectorInterceptor adaptor) {
+ public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, JobHistorySpoutCollectorInterceptor adaptor) {
this.contentFilter = filter;
this.configManager = configManager;
this.interceptor = adaptor;
@@ -131,15 +131,15 @@ public class JobHistorySpout extends BaseRichSpout {
partitionId = calculatePartitionId(context);
// sanity verify 0<=partitionId<=numTotalPartitions-1
if (partitionId < 0 || partitionId > numTotalPartitions) {
- throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
- partitionId + " and numTotalPartitions " + numTotalPartitions);
+ throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
+ + partitionId + " and numTotalPartitions " + numTotalPartitions);
}
Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls;
JobIdPartitioner partitioner;
try {
partitioner = partitionerCls.newInstance();
} catch (Exception e) {
- LOG.error("failing instantiating job partitioner class " + partitionerCls,e);
+ LOG.error("failing instantiating job partitioner class " + partitionerCls, e);
throw new IllegalStateException(e);
}
JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
@@ -148,14 +148,14 @@ public class JobHistorySpout extends BaseRichSpout {
interceptor.setSpoutOutputCollector(collector);
try {
- m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
+ jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(),
- configManager.getControlConfig(),
- callback,
- zkState,
- m_jhfLCM,
- jobIdFilter,
- partitionId);
+ configManager.getControlConfig(),
+ callback,
+ zkState,
+ jhfLCM,
+ jobIdFilter,
+ partitionId);
} catch (Exception e) {
LOG.error("failing creating crawler driver");
throw new IllegalStateException(e);
@@ -171,7 +171,7 @@ public class JobHistorySpout extends BaseRichSpout {
} catch (Exception ex) {
LOG.error("fail crawling job history file and continue ...", ex);
try {
- m_jhfLCM.freshFileSystem();
+ jhfLCM.freshFileSystem();
} catch (Exception e) {
LOG.error("failed to fresh file system ", e);
}
@@ -179,27 +179,27 @@ public class JobHistorySpout extends BaseRichSpout {
try {
Thread.sleep(1000);
} catch (Exception e) {
-
+ // ignored
}
}
}
/**
- * empty because framework will take care of output fields declaration
+ * empty because framework will take care of output fields declaration.
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
/**
- * add to processedJob
+ * add to processedJob.
*/
@Override
public void ack(Object jobId) {
}
/**
- * job is not fully processed
+ * job is not fully processed.
*/
@Override
public void fail(Object jobId) {
@@ -227,26 +227,28 @@ public class JobHistorySpout extends BaseRichSpout {
}
}
- if (minTimeStamp == 0l) {
+ if (minTimeStamp == 0L) {
return;
}
LOG.info("update process time stamp {}", minTimeStamp);
- final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
- final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
- Map<String, String> baseTags = new HashMap<String, String>() { {
- put("site", jobExtractorConfig.site);
- } };
+ final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ Map<String, String> baseTags = new HashMap<String, String>() {
+ {
+ put("site", jobExtractorConfig.site);
+ }
+ };
JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
entity.setCurrentTimeStamp(minTimeStamp);
entity.setTimestamp(minTimeStamp);
entity.setTags(baseTags);
IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
@@ -267,7 +269,7 @@ public class JobHistorySpout extends BaseRichSpout {
LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
}
}
- tried ++;
+ tried++;
}
client.getJerseyClient().destroy();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
index 933b347..cbde88c 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -22,12 +22,20 @@ import java.util.List;
public interface JobHistoryZKStateLCM {
void ensureJobPartitions(int numTotalPartitions);
+
String readProcessedDate(int partitionId);
+
List<String> readProcessedJobs(String date);
+
void updateProcessedDate(int partitionId, String date);
+
void addProcessedJob(String date, String jobId);
+
void truncateProcessedJob(String date);
+
void truncateEverything();
+
long readProcessedTimeStamp(int partitionId);
+
void updateProcessedTimeStamp(int partitionId, long timeStamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 33d3cb2..feb896e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -18,11 +18,12 @@
package org.apache.eagle.jpm.mr.history.zkres;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +46,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
private CuratorFramework newCurator(ZKStateConfig config) throws Exception {
return CuratorFrameworkFactory.newClient(
- config.zkQuorum,
- config.zkSessionTimeoutMs,
- 15000,
- new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
+ config.zkQuorum,
+ config.zkSessionTimeoutMs,
+ 15000,
+ new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
);
}
@@ -86,7 +87,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
if (_curator.checkExists().forPath(path) != null) {
_curator.delete().forPath(path);
}
- } catch(Exception ex) {
+ } catch (Exception ex) {
LOG.error("fail reading forceStartFrom znode", ex);
}
}
@@ -102,27 +103,28 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
/**
* under zkRoot, znode forceStartFrom is used to force job is crawled from that date
* IF
- * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
+ * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
* THEN
- * rebuild all partitions with the forceStartFrom
+ * rebuild all partitions with the forceStartFrom
* ELSE
- * IF
- * partition structure is changed
- * THEN
- * IF
- * there is valid mindate for existing partitions
- * THEN
- * rebuild job partitions from that valid mindate
- * ELSE
- * rebuild job partitions from (today - BACKOFF_DAYS)
- * END
- * ELSE
- * do nothing
- * END
+ * IF
+ * partition structure is changed
+ * THEN
+ * IF
+ * there is valid mindate for existing partitions
+ * THEN
+ * rebuild job partitions from that valid mindate
+ * ELSE
+ * rebuild job partitions from (today - BACKOFF_DAYS)
+ * END
+ * ELSE
+ * do nothing
* END
- *
- *
+ * END
+ * <p>
* forceStartFrom is deleted once its value is used, so next time when topology is restarted, program can run from where topology is stopped last time
+ * </p>
+ * .
*/
@Override
public void ensureJobPartitions(int numTotalPartitions) {
@@ -137,7 +139,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
if (forceStartFrom != null) {
try {
minDate = Integer.valueOf(forceStartFrom);
- } catch(Exception ex) {
+ } catch (Exception ex) {
LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom);
throw new IllegalStateException();
}
@@ -153,16 +155,18 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions);
}
}
- if (!structureChanged)
+ if (!structureChanged) {
return; // do nothing
+ }
if (pathExists) {
List<String> partitions = _curator.getChildren().forPath(path);
for (String partition : partitions) {
String date = new String(_curator.getData().forPath(path + "/" + partition), "UTF-8");
int tmp = Integer.valueOf(date);
- if(tmp < minDate)
+ if (tmp < minDate) {
minDate = tmp;
+ }
}
}
@@ -178,7 +182,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
} finally {
try {
lock.release();
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.error("fail releasing lock", e);
throw new RuntimeException(e);
}
@@ -195,9 +199,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
for (int i = 0; i < numTotalPartitions; i++) {
_curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
}
}
@@ -222,9 +226,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, date.getBytes("UTF-8"));
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, date.getBytes("UTF-8"));
} else {
_curator.setData().forPath(path, date.getBytes("UTF-8"));
}
@@ -240,9 +244,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path);
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
} else {
_curator.setData().forPath(path);
}
@@ -311,10 +315,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path);
- return 0l;
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
+ return 0L;
} else {
return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8"));
}
@@ -330,9 +334,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path);
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path);
}
_curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8"));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
new file mode 100644
index 0000000..5e69a16
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>SPARK_HISTORY_JOB_APP</type>
+ <name>Spark History Job Monitoring</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.jpm.mr.history.MRHistoryJobApplication</appClass>
+ <viewPath>/apps/jpm</viewPath>
+ <configuration>
+ <!-- org.apache.eagle.jpm.mr.history.MRHistoryJobConfig -->
+ <property>
+ <name>jobExtractorConfig.site</name>
+ <displayName>Site ID</displayName>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.mrVersion</name>
+ <value>MRVer2</value>
+ </property>
+ <property>
+ <name>jobExtractorConfig.readTimeOutSeconds</name>
+ <displayName>zkPort</displayName>
+ <value>10</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkQuorum</name>
+ <value>sandbox.hortonworks.com:2181</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkPort</name>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkSessionTimeoutMs</name>
+ <value>15000</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkRetryTimes</name>
+ <value>3</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkRetryInterval</name>
+ <value>20000</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkRoot</name>
+ <value>/test_mrjobhistory</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.basePath</name>
+ <value>/mr-history/done</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.jobTrackerName</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.nnEndpoint</name>
+ <value>hdfs://sandbox.hortonworks.com:8020</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.pathContainsJobTrackerName</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.principal</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.keytab</name>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.dryRun</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.partitionerCls</name>
+ <value>org.apache.eagle.jpm.util.DefaultJobIdPartitioner</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zeroBasedMonth</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>MRConfigureKeys.jobConfigKey</name>
+ <value>mapreduce.map.output.compress,
+ mapreduce.map.output.compress.codec,
+ mapreduce.output.fileoutputformat.compress,
+ mapreduce.output.fileoutputformat.compress.type,
+ mapreduce.output.fileoutputformat.compress.codec,
+ mapred.output.format.class,
+ dataplatform.etl.info,
+ mapreduce.map.memory.mb,
+ mapreduce.reduce.memory.mb,
+ mapreduce.map.java.opts,
+ mapreduce.reduce.java.opts</value>
+ </property>
+ <property>
+ <name>MRConfigureKeys.jobNameKey</name>
+ <value>eagle.job.name</value>
+ </property>
+ <property>
+ <name>envContextConfig.parallelismConfig.mrHistoryJobExecutor</name>
+ <value>6</value>
+ </property>
+ <property>
+ <name>envContextConfig.tasks.mrHistoryJobExecutor</name>
+ <value>6</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <description>eagleProps.eagleService.host</description>
+ <value>sandbox.hortonworks.com</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <description>eagleProps.eagleService.port</description>
+ <value>9099</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <description>eagleProps.eagleService.username</description>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <description>eagleProps.eagleService.password</description>
+ <value>secret</value>
+ </property>
+ </configuration>
+ <docs>
+ <install>
+ </install>
+ <uninstall>
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..56a30bd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 23a51fc..13e411f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -15,9 +15,6 @@
{
"envContextConfig" : {
- "env" : "local",
- "topologyName" : "mr_history",
- "stormConfigFile" : "storm.yaml",
"parallelismConfig" : {
"mrHistoryJobExecutor" : 6
},
@@ -62,21 +59,10 @@
"password": "secret"
}
},
-
+ "appId":"mr_history",
+ "mode":"LOCAL",
"MRConfigureKeys" : {
"jobNameKey" : "eagle.job.name",
- "jobConfigKey" : [
- "mapreduce.map.output.compress",
- "mapreduce.map.output.compress.codec",
- "mapreduce.output.fileoutputformat.compress",
- "mapreduce.output.fileoutputformat.compress.type",
- "mapreduce.output.fileoutputformat.compress.codec",
- "mapred.output.format.class",
- "dataplatform.etl.info",
- "mapreduce.map.memory.mb",
- "mapreduce.reduce.memory.mb",
- "mapreduce.map.java.opts",
- "mapreduce.reduce.java.opts"
- ]
+ "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
new file mode 100644
index 0000000..0a3a3a1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.AppJUnitRunner;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(AppJUnitRunner.class)
+public class MRHistoryJobApplicationProviderTest {
+ @Inject private ApplicationSimulator simulator;
+
+ @Test
+ public void testRunAsManagedApplicationWithSimulator(){
+ simulator.start(MRHistoryJobApplicationProvider.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
new file mode 100644
index 0000000..318a641
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+
+public class MRHistoryJobApplicationTest {
+ @Test
+ public void testRun(){
+ new MRHistoryJobApplication().run(ConfigFactory.load());
+ }
+}
[3/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app
with new app framework
Posted by ha...@apache.org.
[EAGLE-461] Convert MR history app with new app framework
https://issues.apache.org/jira/browse/EAGLE-461
Author: Hao Chen <ha...@apache.org>
Closes #380 from haoch/EAGLE-461.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0bde482b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0bde482b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0bde482b
Branch: refs/heads/develop
Commit: 0bde482be40f208ba944d32ece23533714c87133
Parents: b52405f
Author: Hao Chen <ha...@apache.org>
Authored: Wed Aug 24 15:48:58 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Aug 24 15:48:58 2016 +0800
----------------------------------------------------------------------
eagle-jpm/eagle-jpm-mr-history/pom.xml | 5 +
.../jpm/mr/history/MRHistoryJobApplication.java | 75 +++
.../MRHistoryJobApplicationProvider.java | 26 +
.../jpm/mr/history/MRHistoryJobConfig.java | 208 +++++++
.../eagle/jpm/mr/history/MRHistoryJobMain.java | 70 +--
.../jpm/mr/history/common/JHFConfigManager.java | 182 ------
.../crawler/DefaultJHFInputStreamCallback.java | 8 +-
.../history/crawler/JHFCrawlerDriverImpl.java | 6 +-
.../mr/history/crawler/JobHistoryDAOImpl.java | 2 +-
.../HistoryJobEntityCreationListener.java | 6 +-
.../HistoryJobEntityLifecycleListener.java | 5 +-
.../jpm/mr/history/parser/ImportException.java | 2 -
.../mr/history/parser/JHFEventReaderBase.java | 302 +++++-----
.../mr/history/parser/JHFMRVer1EventReader.java | 30 +-
.../jpm/mr/history/parser/JHFMRVer1Parser.java | 319 ++++++-----
.../parser/JHFMRVer1PerLineListener.java | 14 +-
.../mr/history/parser/JHFMRVer2EventReader.java | 553 +++++++++++++------
.../jpm/mr/history/parser/JHFMRVer2Parser.java | 55 +-
.../jpm/mr/history/parser/JHFParserBase.java | 3 +-
.../jpm/mr/history/parser/JHFParserFactory.java | 44 +-
.../parser/JHFWriteNotCompletedException.java | 6 +-
...JobConfigurationCreationServiceListener.java | 24 +-
.../JobEntityCreationEagleServiceListener.java | 45 +-
.../parser/JobEntityCreationPublisher.java | 9 +-
.../parser/JobEntityLifecycleAggregator.java | 37 +-
.../mr/history/parser/MRErrorClassifier.java | 17 +-
.../jpm/mr/history/parser/RecordTypes.java | 5 +-
.../parser/TaskAttemptCounterListener.java | 54 +-
.../mr/history/parser/TaskFailureListener.java | 142 ++---
.../jpm/mr/history/storm/JobHistorySpout.java | 122 ++--
.../mr/history/zkres/JobHistoryZKStateLCM.java | 8 +
.../history/zkres/JobHistoryZKStateManager.java | 90 +--
....history.MRHistoryJobApplicationProvider.xml | 154 ++++++
...org.apache.eagle.app.spi.ApplicationProvider | 16 +
.../src/main/resources/application.conf | 20 +-
.../MRHistoryJobApplicationProviderTest.java | 33 ++
.../mr/history/MRHistoryJobApplicationTest.java | 27 +
37 files changed, 1633 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index bfd4cf2..1ffda6a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -104,6 +104,11 @@
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
new file mode 100644
index 0000000..08607a1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
+import org.apache.eagle.jpm.util.Constants;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class MRHistoryJobApplication extends StormApplication {
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ //1. trigger init conf
+ MRHistoryJobConfig appConfig = MRHistoryJobConfig.getInstance(config);
+ com.typesafe.config.Config jhfAppConf = appConfig.getConfig();
+
+ //2. init JobHistoryContentFilter
+ final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
+ String[] confKeyPatternsSplit = jhfAppConf.getString("MRConfigureKeys.jobConfigKey").split(",");
+ List<String> confKeyPatterns = new ArrayList<>(confKeyPatternsSplit.length);
+ for (String confKeyPattern : confKeyPatternsSplit) {
+ confKeyPatterns.add(confKeyPattern.trim());
+ }
+ confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
+ confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
+ confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
+ confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
+
+ String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey");
+ builder.setJobNameKey(jobNameKey);
+
+ for (String key : confKeyPatterns) {
+ builder.includeJobKeyPatterns(Pattern.compile(key));
+ }
+ JobHistoryContentFilter filter = builder.build();
+ //3. init topology
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ String spoutName = "mrHistoryJobExecutor";
+ int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
+ int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
+ if (parallelism > tasks) {
+ parallelism = tasks;
+ }
+ topologyBuilder.setSpout(
+ spoutName,
+ new JobHistorySpout(filter, appConfig),
+ parallelism
+ ).setNumTasks(tasks);
+ return topologyBuilder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
new file mode 100644
index 0000000..9aa1c61
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider<MRHistoryJobApplication> {
+ @Override
+ public MRHistoryJobApplication getApplication() {
+ return new MRHistoryJobApplication();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
new file mode 100644
index 0000000..ae86904
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.common.config.ConfigOptionParser;
+import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
+import org.apache.eagle.jpm.util.JobIdPartitioner;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class MRHistoryJobConfig implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobConfig.class);
+
+ private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
+
+ public String getEnv() {
+ return env;
+ }
+
+ private String env;
+
+ public ZKStateConfig getZkStateConfig() {
+ return zkStateConfig;
+ }
+
+ private ZKStateConfig zkStateConfig;
+
+ public JobHistoryEndpointConfig getJobHistoryEndpointConfig() {
+ return jobHistoryEndpointConfig;
+ }
+
+ private JobHistoryEndpointConfig jobHistoryEndpointConfig;
+
+ public ControlConfig getControlConfig() {
+ return controlConfig;
+ }
+
+ private ControlConfig controlConfig;
+
+ public JobExtractorConfig getJobExtractorConfig() {
+ return jobExtractorConfig;
+ }
+
+ private JobExtractorConfig jobExtractorConfig;
+
+ public EagleServiceConfig getEagleServiceConfig() {
+ return eagleServiceConfig;
+ }
+
+ private EagleServiceConfig eagleServiceConfig;
+
+ public Config getConfig() {
+ return config;
+ }
+
+ private Config config;
+
+ public static class ZKStateConfig implements Serializable {
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ public String zkPort;
+ }
+
+ public static class JobHistoryEndpointConfig implements Serializable {
+ public String nnEndpoint;
+ public String basePath;
+ public boolean pathContainsJobTrackerName;
+ public String jobTrackerName;
+ public String principal;
+ public String keyTab;
+ }
+
+ public static class ControlConfig implements Serializable {
+ public boolean dryRun;
+ public Class<? extends JobIdPartitioner> partitionerCls;
+ public boolean zeroBasedMonth;
+ public String timeZone;
+ }
+
+ public static class JobExtractorConfig implements Serializable {
+ public String site;
+ public String mrVersion;
+ public int readTimeoutSeconds;
+ }
+
+ public static class EagleServiceConfig implements Serializable {
+ public String eagleServiceHost;
+ public int eagleServicePort;
+ public String username;
+ public String password;
+ }
+
+ private static MRHistoryJobConfig manager = new MRHistoryJobConfig();
+
+ /**
+ * As this is singleton object and constructed while this class is being initialized,
+ * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError.
+ * And this is unrecoverable and hard to troubleshooting.
+ */
+ private MRHistoryJobConfig() {
+ this.zkStateConfig = new ZKStateConfig();
+ this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
+ this.controlConfig = new ControlConfig();
+ this.jobExtractorConfig = new JobExtractorConfig();
+ this.eagleServiceConfig = new EagleServiceConfig();
+ }
+
+ public static MRHistoryJobConfig getInstance(String[] args) {
+ manager.init(args);
+ return manager;
+ }
+
+ public static MRHistoryJobConfig getInstance(Config config) {
+ manager.init(config);
+ return manager;
+ }
+
+ /**
+ * read configuration file and load hbase config etc.
+ */
+ private void init(String[] args) {
+ // TODO: Probably we can remove the properties file path check in future
+ try {
+ LOG.info("Loading from configuration file");
+ init(new ConfigOptionParser().load(args));
+ } catch (Exception e) {
+ LOG.error("failed to load config");
+ }
+ }
+
+ /**
+ * read configuration file and load hbase config etc.
+ */
+ private void init(Config config) {
+ this.config = config;
+ this.env = config.getString("envContextConfig.env");
+ //parse eagle job extractor
+ this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+ this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
+ this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
+ //parse eagle zk
+ this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+ this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+ this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+ this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+
+ //parse job history endpoint
+ this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
+ this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
+ this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+ this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
+ this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
+ this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+
+ //parse control config
+ this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun");
+ try {
+ this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+ assert this.controlConfig.partitionerCls != null;
+ } catch (Exception e) {
+ LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
+ this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
+ } finally {
+ LOG.info("Loaded partitioner class: {}", this.controlConfig.partitionerCls);
+ }
+ this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth");
+ this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone");
+
+ // parse eagle service endpoint
+ this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+ String port = config.getString("eagleProps.eagleService.port");
+ this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+ this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+ this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+
+ LOG.info("Successfully initialized MRHistoryJobConfig");
+ LOG.info("env: " + this.env);
+ LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
+ LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
+ LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+ LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
index 9f030a7..bef72cc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -18,74 +18,8 @@
package org.apache.eagle.jpm.mr.history;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
-import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
-import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
-import org.apache.eagle.jpm.util.Constants;
-
-import java.util.List;
-import java.util.regex.Pattern;
-
public class MRHistoryJobMain {
public static void main(String []args) {
- try {
- //1. trigger init conf
- JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args);
- com.typesafe.config.Config jhfAppConf = jhfConfigManager.getConfig();
-
- //2. init JobHistoryContentFilter
- JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
- List<String> confKeyPatterns = jhfAppConf.getStringList("MRConfigureKeys.jobConfigKey");
- confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
- confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
- confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
- confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
-
- String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey");
- builder.setJobNameKey(jobNameKey);
-
- for (String key : confKeyPatterns) {
- builder.includeJobKeyPatterns(Pattern.compile(key));
- }
- JobHistoryContentFilter filter = builder.build();
-
- //3. init topology
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- String topologyName = "mrHistoryJobTopology";
- if (jhfAppConf.hasPath("envContextConfig.topologyName")) {
- topologyName = jhfAppConf.getString("envContextConfig.topologyName");
- }
- String spoutName = "mrHistoryJobExecutor";
- int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
- int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
- if (parallelism > tasks) {
- parallelism = tasks;
- }
- topologyBuilder.setSpout(
- spoutName,
- new JobHistorySpout(filter, jhfConfigManager),
- parallelism
- ).setNumTasks(tasks);
-
- Config config = new backtype.storm.Config();
- config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers"));
- config.put(Config.TOPOLOGY_DEBUG, true);
- if (!jhfConfigManager.getEnv().equals("local")) {
- //cluster mode
- //parse conf here
- StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
- } else {
- //local mode
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
+ new MRHistoryJobApplication().run(args);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
deleted file mode 100644
index c99891b..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.common;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.ConfigOptionParser;
-import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
-import org.apache.eagle.jpm.util.JobIdPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-public class JHFConfigManager implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(JHFConfigManager.class);
-
- private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
-
- public String getEnv() {
- return env;
- }
- private String env;
-
- public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
- private ZKStateConfig zkStateConfig;
-
- public JobHistoryEndpointConfig getJobHistoryEndpointConfig() { return jobHistoryEndpointConfig; }
- private JobHistoryEndpointConfig jobHistoryEndpointConfig;
-
- public ControlConfig getControlConfig() { return controlConfig; }
- private ControlConfig controlConfig;
-
- public JobExtractorConfig getJobExtractorConfig() { return jobExtractorConfig; }
- private JobExtractorConfig jobExtractorConfig;
-
- public EagleServiceConfig getEagleServiceConfig() {
- return eagleServiceConfig;
- }
- private EagleServiceConfig eagleServiceConfig;
-
- public Config getConfig() {
- return config;
- }
- private Config config;
-
- public static class ZKStateConfig implements Serializable {
- public String zkQuorum;
- public String zkRoot;
- public int zkSessionTimeoutMs;
- public int zkRetryTimes;
- public int zkRetryInterval;
- public String zkPort;
- }
-
- public static class JobHistoryEndpointConfig implements Serializable {
- public String nnEndpoint;
- public String basePath;
- public boolean pathContainsJobTrackerName;
- public String jobTrackerName;
- public String principal;
- public String keyTab;
- }
-
- public static class ControlConfig implements Serializable {
- public boolean dryRun;
- public Class<? extends JobIdPartitioner> partitionerCls;
- public boolean zeroBasedMonth;
- public String timeZone;
- }
-
- public static class JobExtractorConfig implements Serializable {
- public String site;
- public String mrVersion;
- public int readTimeoutSeconds;
- }
-
- public static class EagleServiceConfig implements Serializable {
- public String eagleServiceHost;
- public int eagleServicePort;
- public String username;
- public String password;
- }
-
- private static JHFConfigManager manager = new JHFConfigManager();
-
- /**
- * As this is singleton object and constructed while this class is being initialized,
- * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError.
- * And this is unrecoverable and hard to troubleshooting.
- */
- private JHFConfigManager() {
- this.zkStateConfig = new ZKStateConfig();
- this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
- this.controlConfig = new ControlConfig();
- this.jobExtractorConfig = new JobExtractorConfig();
- this.eagleServiceConfig = new EagleServiceConfig();
- }
-
- public static JHFConfigManager getInstance(String []args) {
- manager.init(args);
- return manager;
- }
-
- /**
- * read configuration file and load hbase config etc
- */
- private void init(String[] args) {
- // TODO: Probably we can remove the properties file path check in future
- try {
- LOG.info("Loading from configuration file");
- this.config = new ConfigOptionParser().load(args);
- } catch (Exception e) {
- LOG.error("failed to load config");
- }
-
- this.env = config.getString("envContextConfig.env");
-
- //parse eagle job extractor
- this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
- this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
- this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
- //parse eagle zk
- this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
- this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort");
- this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
- this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
- this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
- this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
-
- //parse job history endpoint
- this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
- this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
- this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
- this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
- this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
- this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
-
- //parse control config
- this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun");
- try {
- this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
- assert this.controlConfig.partitionerCls != null;
- } catch (Exception e) {
- LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
- this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
- } finally {
- LOG.info("Loaded partitioner class: {}",this.controlConfig.partitionerCls);
- }
- this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth");
- this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone");
-
- // parse eagle service endpoint
- this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
- String port = config.getString("eagleProps.eagleService.port");
- this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
- this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
- this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
-
- LOG.info("Successfully initialized JHFConfigManager");
- LOG.info("env: " + this.env);
- LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
- LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
- LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
- LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 6f85149..aeb35fd 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.crawler;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
import org.slf4j.Logger;
@@ -33,16 +33,16 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
private JobHistoryContentFilter m_filter;
- private JHFConfigManager m_configManager;
+ private MRHistoryJobConfig m_configManager;
- public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) {
+ public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) {
this.m_filter = filter;
this.m_configManager = configManager;
}
@Override
public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception {
- final JHFConfigManager.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig();
+ final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig();
@SuppressWarnings("serial")
Map<String, String> baseTags = new HashMap<String, String>() { {
put("site", jobExtractorConfig.site);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index d3e1816..52bd8ea 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -19,7 +19,7 @@
package org.apache.eagle.jpm.mr.history.crawler;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.util.JobIdFilter;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
import org.slf4j.Logger;
@@ -61,8 +61,8 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
private int m_partitionId;
private TimeZone m_timeZone;
- public JHFCrawlerDriverImpl(JHFConfigManager.JobHistoryEndpointConfig jobHistoryConfig,
- JHFConfigManager.ControlConfig controlConfig, JHFInputStreamCallback reader,
+ public JHFCrawlerDriverImpl(MRHistoryJobConfig.JobHistoryEndpointConfig jobHistoryConfig,
+ MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader,
JobHistoryZKStateLCM zkStateLCM,
JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
this.m_zeroBasedMonth = controlConfig.zeroBasedMonth;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
index 3b303fd..cfd5994 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
@@ -33,7 +33,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.JobHistoryEndpointConfig;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
public class JobHistoryDAOImpl extends AbstractJobHistoryDAO {
private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDAOImpl.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
index bdaedd4..892c2ea 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
@@ -22,16 +22,18 @@ package org.apache.eagle.jpm.mr.history.parser;
import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
/**
- * generalizing this listener would decouple entity creation and entity handling, also will help unit testing
- * @author yonzhang
+ * generalizing this listener would decouple entity creation and entity handling, also will help unit testing.
*
+ * @author yonzhang
*/
public interface HistoryJobEntityCreationListener {
/**
* job entity created event
+ *
* @param entity
*/
void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
+
/**
* for streaming processing, flush would help commit the last several entities
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
index ae6b5c9..a803c6d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
@@ -22,13 +22,12 @@ import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
public interface HistoryJobEntityLifecycleListener extends HistoryJobEntityCreationListener {
/**
- * job entity created event
- * @param entity
+ * job entity created event.
*/
void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
/**
- * Job finished
+ * Job finished.
*/
void jobFinish();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
index d454c31..652eaf8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
@@ -18,8 +18,6 @@
package org.apache.eagle.jpm.mr.history.parser;
-/**
- */
public class ImportException extends RuntimeException {
private static final long serialVersionUID = -706778307046285820L;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 9992690..82e305a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +18,13 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.historyentity.JobConfig;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.eagle.jpm.mr.historyentity.*;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.JobNameNormalization;
import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.slf4j.Logger;
@@ -75,10 +74,18 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
private long sumReduceTaskDuration;
public Constants.JobType fetchJobType(Configuration config) {
- if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; }
- if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { return Constants.JobType.HIVE; }
- if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { return Constants.JobType.PIG; }
- if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {return Constants.JobType.SCOOBI; }
+ if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
+ return Constants.JobType.CASCADING;
+ }
+ if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) {
+ return Constants.JobType.HIVE;
+ }
+ if (config.get(Constants.JobConfiguration.PIG_JOB) != null) {
+ return Constants.JobType.PIG;
+ }
+ if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {
+ return Constants.JobType.SCOOBI;
+ }
return Constants.JobType.NOTAVALIABLE;
}
@@ -86,6 +93,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
* baseTags stores the basic tag name values which might be used for persisting various entities
* baseTags includes: cluster, datacenter and jobName
* baseTags are used for all job/task related entities
+ *
* @param baseTags
*/
public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -120,7 +128,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
this.sumReduceTaskDuration = 0l;
}
- public void register(HistoryJobEntityLifecycleListener lifecycleListener){
+ public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
this.jobEntityLifecycleListeners.add(lifecycleListener);
}
@@ -132,7 +140,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
}
try {
flush();
- } catch(Exception ex) {
+ } catch (Exception ex) {
throw new IOException(ex);
}
}
@@ -146,8 +154,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
}
/**
- * @param id
- */
+ * @param id
+ */
private void setJobID(String id) {
this.m_jobId = id;
}
@@ -157,128 +165,128 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
}
protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
- String id = values.get(Keys.JOBID);
-
- if (m_jobId == null) {
- setJobID(id);
- } else if (!m_jobId.equals(id)) {
- String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'";
- LOG.error(msg);
- throw new ImportException(msg);
- }
-
- if (values.get(Keys.SUBMIT_TIME) != null) { // job submitted
- m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
- m_user = values.get(Keys.USER);
- m_queueName = values.get(Keys.JOB_QUEUE);
- m_jobName = values.get(Keys.JOBNAME);
-
- // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate.
- String jobDefId = null;
- if(configuration != null ) {
- jobDefId = configuration.get(m_filter.getJobNameKey());
- }
-
- if(jobDefId == null) {
- m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName);
- } else {
- LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId);
- m_jobDefId = jobDefId;
- }
-
- LOG.info("JobDefId of " + id + ": " + m_jobDefId);
-
- m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
- m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
- m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
- m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
- m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
- entityCreated(m_jobSubmitEventEntity);
- } else if(values.get(Keys.LAUNCH_TIME) != null) { // job launched
- m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
- m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
- m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
- m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
- m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
- m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
- m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
- m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
- m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
- entityCreated(m_jobLaunchEventEntity);
- } else if(values.get(Keys.FINISH_TIME) != null) { // job finished
- m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
- m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
- m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
- m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
- m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
- m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
- entityCreated(m_jobFinishEventEntity);
-
- // populate jobExecutionEntity entity
- m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
- m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
- m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
- m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
- m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName);
- m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
-
- m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
- m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
- m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
- m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime());
- m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
- m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
- if (values.get(Keys.FAILED_MAPS) != null) {
- // for Artemis
- m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
- }
- if (values.get(Keys.FAILED_REDUCES) != null) {
- // for Artemis
- m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
- }
- m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
- m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
- m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
- m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
- if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
- JobCounters jobCounters = parseCounters(totalCounters);
- m_jobExecutionEntity.setJobCounters(jobCounters);
- if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
- Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER);
- if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
- m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
- }
-
- if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
- m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
- }
-
- if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
- m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
- }
- }
-
- if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
- m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
- m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
- }
- }
- m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps);
- if (m_numTotalReduces == 0) {
- m_jobExecutionEntity.setMaxReduceTaskDuration(0);
- m_jobExecutionEntity.setAvgReduceTaskDuration(0);
- } else {
- m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces);
- }
- entityCreated(m_jobExecutionEntity);
- }
+ String id = values.get(Keys.JOBID);
+
+ if (m_jobId == null) {
+ setJobID(id);
+ } else if (!m_jobId.equals(id)) {
+ String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'";
+ LOG.error(msg);
+ throw new ImportException(msg);
+ }
+
+ if (values.get(Keys.SUBMIT_TIME) != null) { // job submitted
+ m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
+ m_user = values.get(Keys.USER);
+ m_queueName = values.get(Keys.JOB_QUEUE);
+ m_jobName = values.get(Keys.JOBNAME);
+
+ // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate.
+ String jobDefId = null;
+ if (configuration != null) {
+ jobDefId = configuration.get(m_filter.getJobNameKey());
+ }
+
+ if (jobDefId == null) {
+ m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName);
+ } else {
+ LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId);
+ m_jobDefId = jobDefId;
+ }
+
+ LOG.info("JobDefId of " + id + ": " + m_jobDefId);
+
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+ entityCreated(m_jobSubmitEventEntity);
+ } else if (values.get(Keys.LAUNCH_TIME) != null) { // job launched
+ m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
+ m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+ m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
+ m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
+ entityCreated(m_jobLaunchEventEntity);
+ } else if (values.get(Keys.FINISH_TIME) != null) { // job finished
+ m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+ entityCreated(m_jobFinishEventEntity);
+
+ // populate jobExecutionEntity entity
+ m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName);
+ m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+
+ m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
+ m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
+ m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
+ m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime());
+ m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+ m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
+ if (values.get(Keys.FAILED_MAPS) != null) {
+ // for Artemis
+ m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
+ }
+ if (values.get(Keys.FAILED_REDUCES) != null) {
+ // for Artemis
+ m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
+ }
+ m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
+ m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
+ m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
+ m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
+ if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
+ JobCounters jobCounters = parseCounters(totalCounters);
+ m_jobExecutionEntity.setJobCounters(jobCounters);
+ if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
+ Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER);
+ if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
+ m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
+ }
+
+ if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
+ m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
+ }
+
+ if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
+ m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
+ }
+ }
+
+ if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
+ m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+ m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+ }
+ }
+ m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps);
+ if (m_numTotalReduces == 0) {
+ m_jobExecutionEntity.setMaxReduceTaskDuration(0);
+ m_jobExecutionEntity.setAvgReduceTaskDuration(0);
+ } else {
+ m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces);
+ }
+ entityCreated(m_jobExecutionEntity);
+ }
}
private void entityCreated(JobBaseAPIEntity entity) throws Exception {
- for (HistoryJobEntityLifecycleListener lifecycleListener: this.jobEntityLifecycleListeners) {
+ for (HistoryJobEntityLifecycleListener lifecycleListener : this.jobEntityLifecycleListeners) {
lifecycleListener.jobEntityCreated(entity);
}
@@ -295,11 +303,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
protected abstract JobCounters parseCounters(Object value) throws IOException;
/**
- * for one task ID, it has several sequential task events, i.e.
- * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end
- * @param values
- * @throws IOException
- */
+ * for one task ID, it has several sequential task events, i.e.
+ * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end
+ *
+ * @param values
+ * @throws IOException
+ */
@SuppressWarnings("serial")
protected void handleTask(RecordTypes recType, EventType eventType, final Map<Keys, String> values, Object counters) throws Exception {
String taskAttemptID = values.get(Keys.TASK_ATTEMPT_ID);
@@ -308,7 +317,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
final String taskType = values.get(Keys.TASK_TYPE);
final String taskID = values.get(Keys.TASKID);
- Map<String, String> taskBaseTags = new HashMap<String, String>(){{
+ Map<String, String> taskBaseTags = new HashMap<String, String>() {{
put(MRJobTagName.TASK_TYPE.toString(), taskType);
put(MRJobTagName.USER.toString(), m_user);
//put(MRJobTagName.JOB_NAME.toString(), _jobName);
@@ -402,11 +411,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
Map<String, String> prop = new TreeMap<>();
if (m_filter.acceptJobConfFile()) {
- Iterator<Map.Entry<String, String> > iter = configuration.iterator();
+ Iterator<Map.Entry<String, String>> iter = configuration.iterator();
while (iter.hasNext()) {
String key = iter.next().getKey();
- if (included(key) && !excluded(key))
+ if (included(key) && !excluded(key)) {
prop.put(key, configuration.get(key));
+ }
}
}
@@ -442,15 +452,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
break;
}
}
- if (!matched)
+ if (!matched) {
return false;
+ }
}
return true;
}
private boolean included(String key) {
- if (m_filter.getJobConfKeyInclusionPatterns() == null)
+ if (m_filter.getJobConfKeyInclusionPatterns() == null) {
return true;
+ }
for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) {
Matcher m = p.matcher(key);
if (m.matches()) {
@@ -462,13 +474,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
}
private boolean excluded(String key) {
- if (m_filter.getJobConfKeyExclusionPatterns() == null)
+ if (m_filter.getJobConfKeyExclusionPatterns() == null) {
return false;
+ }
for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) {
Matcher m = p.matcher(key);
- if (m.matches())
+ if (m.matches()) {
return true;
+ }
}
return false;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index 654f63f..6932dad 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -19,9 +19,9 @@
package org.apache.eagle.jpm.mr.history.parser;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
@@ -37,8 +37,8 @@ import java.util.Map;
/**
* Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading.
- * @author yonzhang
*
+ * @author yonzhang
*/
public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener {
private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class);
@@ -47,6 +47,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
* baseTags stores the basic tag name values which might be used for persisting various entities
* baseTags includes: cluster, datacenter and jobName
* baseTags are used for all job/task related entities
+ *
* @param baseTags
*/
public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -55,7 +56,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
@Override
public void handle(RecordTypes recType, Map<Keys, String> values)
- throws Exception {
+ throws Exception {
switch (recType) {
case Job:
handleJob(null, values, values.get(Keys.COUNTERS));
@@ -76,11 +77,12 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
;
}
}
-
+
private void ensureRackHostnameAfterAttemptFinish(Map<Keys, String> values) {
// only care about attempt finish
- if (values.get(Keys.FINISH_TIME) == null)
+ if (values.get(Keys.FINISH_TIME) == null) {
return;
+ }
String hostname = null;
String rack = null;
// we get rack/hostname based on task's status
@@ -92,24 +94,24 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
hostname = tmp[tmp.length - 1];
rack = tmp[tmp.length - 2];
m_host2RackMapping.put(hostname, rack);
- } else if(values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
+ } else if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
hostname = values.get(Keys.HOSTNAME);
// make every effort to get RACK information
hostname = (hostname == null) ? "" : hostname;
rack = m_host2RackMapping.get(hostname);
}
-
+
values.put(Keys.HOSTNAME, hostname);
values.put(Keys.RACK, rack);
}
-
+
@Override
protected JobCounters parseCounters(Object value) throws IOException {
JobCounters jc = new JobCounters();
Map<String, Map<String, Long>> groups = new HashMap<String, Map<String, Long>>();
- Counters counters = new Counters();
+ Counters counters = new Counters();
try {
- CountersStrings.parseEscapedCompactString((String)value, counters);
+ CountersStrings.parseEscapedCompactString((String) value, counters);
} catch (Exception ex) {
logger.error("can not parse job history", ex);
throw new IOException(ex);
@@ -118,7 +120,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
while (it.hasNext()) {
CounterGroup cg = it.next();
- // hardcoded to exclude business level counters
+ // hardcoded to exclude business level counters
if (!cg.getName().equals("org.apache.hadoop.mapreduce.FileSystemCounter")
&& !cg.getName().equals("org.apache.hadoop.mapreduce.TaskCounter")
&& !cg.getName().equals("org.apache.hadoop.mapreduce.JobCounter")
@@ -128,7 +130,9 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
&& !cg.getName().equals("org.apache.hadoop.mapred.Task$Counter") // for artemis
&& !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis
&& !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter")
- ) continue;
+ ) {
+ continue;
+ }
groups.put(cg.getName(), new HashMap<String, Long>());
Map<String, Long> counterValues = groups.get(cg.getName());
@@ -143,7 +147,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
jc.setCounters(groups);
return jc;
}
-
+
public JobExecutionAPIEntity jobExecution() {
return m_jobExecutionEntity;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
index bb08ef0..ab59a41 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
@@ -44,16 +44,18 @@ public class JHFMRVer1Parser implements JHFParserBase {
static final String MAX_COUNTER_COUNT = "10000";
private JHFMRVer1EventReader m_reader;
- public JHFMRVer1Parser(JHFMRVer1EventReader reader){
+
+ public JHFMRVer1Parser(JHFMRVer1EventReader reader) {
this.m_reader = reader;
}
/**
- * Parses history file and invokes Listener.handle() for
- * each line of history. It can be used for looking through history
- * files for specific items without having to keep whole history in memory.
- * @throws IOException
- */
+ * Parses history file and invokes Listener.handle() for
+ * each line of history. It can be used for looking through history
+ * files for specific items without having to keep whole history in memory.
+ *
+ * @throws IOException
+ */
@Override
public void parse(InputStream in) throws Exception, ParseException {
// set enough counter number as user may build more counters
@@ -68,7 +70,7 @@ public class JHFMRVer1Parser implements JHFParserBase {
// Check if the file is empty
if (line == null) {
- return;
+ return;
}
// Get the information required for further processing
@@ -80,17 +82,17 @@ public class JHFMRVer1Parser implements JHFParserBase {
do {
buf.append(line);
if (!line.trim().endsWith(lineDelim) || line.trim().endsWith(escapedLineDelim)) {
- buf.append("\n");
- continue;
+ buf.append("\n");
+ continue;
}
parseLine(buf.toString(), m_reader, isEscaped);
buf = new StringBuffer();
- } while ((line = reader.readLine())!= null);
+ } while ((line = reader.readLine()) != null);
// flush to tell listener that we have finished parsing
logger.info("finish parsing job history file and close");
m_reader.close();
- } catch(Exception ex) {
+ } catch (Exception ex) {
logger.error("can not parse correctly ", ex);
throw ex;
} finally {
@@ -104,17 +106,17 @@ public class JHFMRVer1Parser implements JHFParserBase {
// extract the record type
int idx = line.indexOf(' ');
String recType = line.substring(0, idx);
- String data = line.substring(idx+1, line.length());
+ String data = line.substring(idx + 1, line.length());
Matcher matcher = pattern.matcher(data);
- Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
+ Map<Keys, String> parseBuffer = new HashMap<Keys, String>();
- while(matcher.find()) {
+ while (matcher.find()) {
String tuple = matcher.group(0);
- String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
- String value = parts[1].substring(1, parts[1].length() -1);
+ String[] parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
+ String value = parts[1].substring(1, parts[1].length() - 1);
if (isEscaped) {
- value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape);
+ value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape);
}
parseBuffer.put(Keys.valueOf(parts[0]), value);
}
@@ -131,141 +133,154 @@ public class JHFMRVer1Parser implements JHFParserBase {
}
parseBuffer.clear();
- }
-
- /**
- * Manages job-history's meta information such as version etc.
- * Helps in logging version information to the job-history and recover
- * version information from the history.
- */
- static class MetaInfoManager implements JHFMRVer1PerLineListener {
- private long version = 0L;
- private KeyValuePair pairs = new KeyValuePair();
-
- public void close() {
- }
- // Extract the version of the history that was used to write the history
- public MetaInfoManager(String line) throws Exception, ParseException {
- if (null != line) {
- // Parse the line
- parseLine(line, this, false);
- }
- }
-
- // Get the line delimiter
- char getLineDelim() {
- if (version == 0) {
- return '"';
- } else {
- return LINE_DELIMITER_CHAR;
- }
- }
-
- // Checks if the values are escaped or not
- boolean isValueEscaped() {
- // Note that the values are not escaped in version 0
- return version != 0;
- }
-
- public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException {
+ }
+
+ /**
+ * Manages job-history's meta information such as version etc.
+ * Helps in logging version information to the job-history and recover
+ * version information from the history.
+ */
+ static class MetaInfoManager implements JHFMRVer1PerLineListener {
+ private long version = 0L;
+ private KeyValuePair pairs = new KeyValuePair();
+
+ public void close() {
+ }
+
+ // Extract the version of the history that was used to write the history
+ public MetaInfoManager(String line) throws Exception, ParseException {
+ if (null != line) {
+ // Parse the line
+ parseLine(line, this, false);
+ }
+ }
+
+ // Get the line delimiter
+ char getLineDelim() {
+ if (version == 0) {
+ return '"';
+ } else {
+ return LINE_DELIMITER_CHAR;
+ }
+ }
+
+ // Checks if the values are escaped or not
+ boolean isValueEscaped() {
+ // Note that the values are not escaped in version 0
+ return version != 0;
+ }
+
+ public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException {
// Check if the record is of type META
- if (RecordTypes.Meta == recType) {
- pairs.handle(values);
- version = pairs.getLong(Keys.VERSION); // defaults to 0
- }
- }
- }
-
- /**
- * Base class contais utility stuff to manage types key value pairs with enums.
- */
- static class KeyValuePair {
- private Map<Keys, String> values = new HashMap<Keys, String>();
-
- /**
- * Get 'String' value for given key. Most of the places use Strings as
- * values so the default get' method returns 'String'. This method never returns
- * null to ease on GUIs. if no value is found it returns empty string ""
- * @param k
- * @return if null it returns empty string - ""
- */
- public String get(Keys k) {
- String s = values.get(k);
- return s == null ? "" : s;
- }
- /**
- * Convert value from history to int and return.
- * if no value is found it returns 0.
- * @param k key
- */
- public int getInt(Keys k) {
- String s = values.get(k);
- if (null != s){
- return Integer.parseInt(s);
- }
- return 0;
- }
- /**
- * Convert value from history to int and return.
- * if no value is found it returns 0.
- * @param k
- */
- public long getLong(Keys k) {
- String s = values.get(k);
- if (null != s){
- return Long.parseLong(s);
- }
- return 0;
- }
- /**
- * Set value for the key.
- * @param k
- * @param s
- */
- public void set(Keys k, String s) {
- values.put(k, s);
- }
- /**
- * Adds all values in the Map argument to its own values.
- * @param m
- */
- public void set(Map<Keys, String> m) {
- values.putAll(m);
- }
- /**
- * Reads values back from the history, input is same Map as passed to Listener by parseHistory().
- * @param values
- */
- public synchronized void handle(Map<Keys, String> values) {
- set(values);
- }
- /**
- * Returns Map containing all key-values.
- */
- public Map<Keys, String> getValues() {
- return values;
- }
- }
-
- /**
- * Job history files contain key="value" pairs, where keys belong to this enum.
- * It acts as a global namespace for all keys.
- */
- public static enum Keys {
- JOBTRACKERID,
- START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
- LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
- FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
- ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
- SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
- TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
- VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
-
- UBERISED,SPLIT_LOCATIONS,FAILED_DUE_TO_ATTEMPT,MAP_FINISH_TIME,PORT,RACK_NAME,
-
- //For Artemis
- WORKFLOW_ID,WORKFLOW_NAME,WORKFLOW_NODE_NAME,WORKFLOW_ADJACENCIES,WORKFLOW_TAGS,
- SHUFFLE_PORT,LOCALITY,AVATAAR,FAIL_REASON
- }
+ if (RecordTypes.Meta == recType) {
+ pairs.handle(values);
+ version = pairs.getLong(Keys.VERSION); // defaults to 0
+ }
+ }
+ }
+
+ /**
+ * Base class contais utility stuff to manage types key value pairs with enums.
+ */
+ static class KeyValuePair {
+ private Map<Keys, String> values = new HashMap<Keys, String>();
+
+ /**
+ * Get 'String' value for given key. Most of the places use Strings as
+ * values so the default get' method returns 'String'. This method never returns
+ * null to ease on GUIs. if no value is found it returns empty string ""
+ *
+ * @param k
+ * @return if null it returns empty string - ""
+ */
+ public String get(Keys k) {
+ String s = values.get(k);
+ return s == null ? "" : s;
+ }
+
+ /**
+ * Convert value from history to int and return.
+ * if no value is found it returns 0.
+ *
+ * @param k key
+ */
+ public int getInt(Keys k) {
+ String s = values.get(k);
+ if (null != s) {
+ return Integer.parseInt(s);
+ }
+ return 0;
+ }
+
+ /**
+ * Convert value from history to int and return.
+ * if no value is found it returns 0.
+ *
+ * @param k
+ */
+ public long getLong(Keys k) {
+ String s = values.get(k);
+ if (null != s) {
+ return Long.parseLong(s);
+ }
+ return 0;
+ }
+
+ /**
+ * Set value for the key.
+ *
+ * @param k
+ * @param s
+ */
+ public void set(Keys k, String s) {
+ values.put(k, s);
+ }
+
+ /**
+ * Adds all values in the Map argument to its own values.
+ *
+ * @param m
+ */
+ public void set(Map<Keys, String> m) {
+ values.putAll(m);
+ }
+
+ /**
+ * Reads values back from the history, input is same Map as passed to Listener by parseHistory().
+ *
+ * @param values
+ */
+ public synchronized void handle(Map<Keys, String> values) {
+ set(values);
+ }
+
+ /**
+ * Returns Map containing all key-values.
+ */
+ public Map<Keys, String> getValues() {
+ return values;
+ }
+ }
+
+ /**
+ * Job history files contain key="value" pairs, where keys belong to this enum.
+ * It acts as a global namespace for all keys.
+ */
+ public static enum Keys {
+ JOBTRACKERID,
+ START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
+ LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
+ FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
+ ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
+ SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
+ TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
+ VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
+
+ UBERISED, SPLIT_LOCATIONS, FAILED_DUE_TO_ATTEMPT, MAP_FINISH_TIME, PORT, RACK_NAME,
+
+ //For Artemis
+ WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES, WORKFLOW_TAGS,
+ SHUFFLE_PORT, LOCALITY, AVATAAR, FAIL_REASON
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
index 1c096fc..5d48d5d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
@@ -24,15 +24,15 @@ import java.io.IOException;
import java.util.Map;
/**
- * Callback interface for reading back log events from JobHistory. This interface
- * should be implemented and passed to JobHistory.parseHistory()
- *
+ * Callback interface for reading back log events from JobHistory. This interface
+ * should be implemented and passed to JobHistory.parseHistory()
*/
-public interface JHFMRVer1PerLineListener{
+public interface JHFMRVer1PerLineListener {
/**
- * Callback method for history parser.
- * @param recType type of record, which is the first entry in the line.
- * @param values a map of key-value pairs as thry appear in history.
+ * Callback method for history parser.
+ *
+ * @param recType type of record, which is the first entry in the line.
+ * @param values a map of key-value pairs as thry appear in history.
* @throws IOException
*/
void handle(RecordTypes recType, Map<Keys, String> values) throws Exception;
[2/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app
with new app framework
Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 2d960b0..4ca0449 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -19,8 +19,8 @@
package org.apache.eagle.jpm.mr.history.parser;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -41,6 +41,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
/**
* Create a new Event Reader
+ *
* @throws IOException
*/
public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -50,86 +51,115 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
@SuppressWarnings("deprecation")
public void handleEvent(Event wrapper) throws Exception {
switch (wrapper.type) {
- case JOB_SUBMITTED:
- handleJobSubmitted(wrapper); break;
- case JOB_INITED:
- handleJobInited(wrapper); break;
- case JOB_FINISHED:
- handleJobFinished(wrapper); break;
- case JOB_PRIORITY_CHANGED:
- handleJobPriorityChanged(); break;
- case JOB_STATUS_CHANGED:
- handleJobStatusChanged(); break;
- case JOB_FAILED:
- case JOB_KILLED:
- case JOB_ERROR:
- handleJobUnsuccessfulCompletion(wrapper); break;
- case JOB_INFO_CHANGED:
- handleJobInfoChanged(); break;
- case JOB_QUEUE_CHANGED:
- handleJobQueueChanged();break;
- case TASK_STARTED:
- handleTaskStarted(wrapper); break;
- case TASK_FINISHED:
- handleTaskFinished(wrapper); break;
- case TASK_FAILED:
- handleTaskFailed(wrapper); break;
- case TASK_UPDATED:
- handleTaskUpdated(); break;
-
- // map task
- case MAP_ATTEMPT_STARTED:
- handleMapAttemptStarted(wrapper); break;
- case MAP_ATTEMPT_FINISHED:
- handleMapAttemptFinished(wrapper); break;
- case MAP_ATTEMPT_FAILED:
- handleMapAttemptFailed(wrapper); break;
- case MAP_ATTEMPT_KILLED:
- handleMapAttemptKilled(wrapper); break;
-
- // reduce task
- case REDUCE_ATTEMPT_STARTED:
- handleReduceAttemptStarted(wrapper); break;
- case REDUCE_ATTEMPT_FINISHED:
- handleReduceAttemptFinished(wrapper); break;
- case REDUCE_ATTEMPT_FAILED:
- handleReduceAttemptFailed(wrapper); break;
- case REDUCE_ATTEMPT_KILLED:
- handleReduceAttemptKilled(wrapper); break;
-
- // set up task
- case SETUP_ATTEMPT_STARTED:
- break;
- case SETUP_ATTEMPT_FINISHED:
- handleSetupAttemptFinished(); break;
- case SETUP_ATTEMPT_FAILED:
- handleSetupAttemptFailed(); break;
- case SETUP_ATTEMPT_KILLED:
- handleSetupAttemptKilled(); break;
-
- // clean up task
- case CLEANUP_ATTEMPT_STARTED:
- break;
- case CLEANUP_ATTEMPT_FINISHED:
- handleCleanupAttemptFinished(); break;
- case CLEANUP_ATTEMPT_FAILED:
- handleCleanupAttemptFailed(); break;
- case CLEANUP_ATTEMPT_KILLED:
- handleCleanupAttemptKilled(); break;
-
- case AM_STARTED:
- handleAMStarted(); break;
- default:
- logger.warn("unexpected event type: " + wrapper.type);
+ case JOB_SUBMITTED:
+ handleJobSubmitted(wrapper);
+ break;
+ case JOB_INITED:
+ handleJobInited(wrapper);
+ break;
+ case JOB_FINISHED:
+ handleJobFinished(wrapper);
+ break;
+ case JOB_PRIORITY_CHANGED:
+ handleJobPriorityChanged();
+ break;
+ case JOB_STATUS_CHANGED:
+ handleJobStatusChanged();
+ break;
+ case JOB_FAILED:
+ case JOB_KILLED:
+ case JOB_ERROR:
+ handleJobUnsuccessfulCompletion(wrapper);
+ break;
+ case JOB_INFO_CHANGED:
+ handleJobInfoChanged();
+ break;
+ case JOB_QUEUE_CHANGED:
+ handleJobQueueChanged();
+ break;
+ case TASK_STARTED:
+ handleTaskStarted(wrapper);
+ break;
+ case TASK_FINISHED:
+ handleTaskFinished(wrapper);
+ break;
+ case TASK_FAILED:
+ handleTaskFailed(wrapper);
+ break;
+ case TASK_UPDATED:
+ handleTaskUpdated();
+ break;
+
+ // map task
+ case MAP_ATTEMPT_STARTED:
+ handleMapAttemptStarted(wrapper);
+ break;
+ case MAP_ATTEMPT_FINISHED:
+ handleMapAttemptFinished(wrapper);
+ break;
+ case MAP_ATTEMPT_FAILED:
+ handleMapAttemptFailed(wrapper);
+ break;
+ case MAP_ATTEMPT_KILLED:
+ handleMapAttemptKilled(wrapper);
+ break;
+
+ // reduce task
+ case REDUCE_ATTEMPT_STARTED:
+ handleReduceAttemptStarted(wrapper);
+ break;
+ case REDUCE_ATTEMPT_FINISHED:
+ handleReduceAttemptFinished(wrapper);
+ break;
+ case REDUCE_ATTEMPT_FAILED:
+ handleReduceAttemptFailed(wrapper);
+ break;
+ case REDUCE_ATTEMPT_KILLED:
+ handleReduceAttemptKilled(wrapper);
+ break;
+
+ // set up task
+ case SETUP_ATTEMPT_STARTED:
+ break;
+ case SETUP_ATTEMPT_FINISHED:
+ handleSetupAttemptFinished();
+ break;
+ case SETUP_ATTEMPT_FAILED:
+ handleSetupAttemptFailed();
+ break;
+ case SETUP_ATTEMPT_KILLED:
+ handleSetupAttemptKilled();
+ break;
+
+ // clean up task
+ case CLEANUP_ATTEMPT_STARTED:
+ break;
+ case CLEANUP_ATTEMPT_FINISHED:
+ handleCleanupAttemptFinished();
+ break;
+ case CLEANUP_ATTEMPT_FAILED:
+ handleCleanupAttemptFailed();
+ break;
+ case CLEANUP_ATTEMPT_KILLED:
+ handleCleanupAttemptKilled();
+ break;
+
+ case AM_STARTED:
+ handleAMStarted();
+ break;
+ default:
+ logger.warn("unexpected event type: " + wrapper.type);
}
}
private void handleJobPriorityChanged() throws Exception {
return;
}
+
private void handleJobStatusChanged() throws Exception {
return;
}
+
private void handleJobInfoChanged() throws Exception {
return;
}
@@ -139,150 +169,285 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
}
private void handleJobSubmitted(Event wrapper) throws Exception {
- JobSubmitted js = ((JobSubmitted)wrapper.getEvent());
+ JobSubmitted js = ((JobSubmitted) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
- if (js.getJobName() != null) values.put(Keys.JOBNAME, js.getJobName().toString());
- if (js.getUserName() != null) values.put(Keys.USER, js.getUserName().toString());
- if (js.getSubmitTime() != null) values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
- if (js.getJobQueueName() != null) values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+ if (js.getJobid() != null) {
+ values.put(Keys.JOBID, js.getJobid().toString());
+ }
+ if (js.getJobName() != null) {
+ values.put(Keys.JOBNAME, js.getJobName().toString());
+ }
+ if (js.getUserName() != null) {
+ values.put(Keys.USER, js.getUserName().toString());
+ }
+ if (js.getSubmitTime() != null) {
+ values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
+ }
+ if (js.getJobQueueName() != null) {
+ values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+ }
handleJob(wrapper.getType(), values, null);
}
private void handleJobInited(Event wrapper) throws Exception {
- JobInited js = ((JobInited)wrapper.getEvent());
+ JobInited js = ((JobInited) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
- if (js.getLaunchTime() != null) values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
- if (js.getTotalMaps() != null) values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
- if (js.getTotalReduces() != null) values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
- if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
- if (js.getUberized() != null) values.put(Keys.UBERISED, js.getUberized().toString());
+ if (js.getJobid() != null) {
+ values.put(Keys.JOBID, js.getJobid().toString());
+ }
+ if (js.getLaunchTime() != null) {
+ values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
+ }
+ if (js.getTotalMaps() != null) {
+ values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
+ }
+ if (js.getTotalReduces() != null) {
+ values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
+ }
+ if (js.getJobStatus() != null) {
+ values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+ }
+ if (js.getUberized() != null) {
+ values.put(Keys.UBERISED, js.getUberized().toString());
+ }
handleJob(wrapper.getType(), values, null);
}
private void handleJobFinished(Event wrapper) throws Exception {
- JobFinished js = ((JobFinished)wrapper.getEvent());
+ JobFinished js = ((JobFinished) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
- if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
- if (js.getFailedMaps() != null) values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
- if (js.getFailedReduces() != null) values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+ if (js.getJobid() != null) {
+ values.put(Keys.JOBID, js.getJobid().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getFinishedMaps() != null) {
+ values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+ }
+ if (js.getFinishedReduces() != null) {
+ values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+ }
+ if (js.getFailedMaps() != null) {
+ values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
+ }
+ if (js.getFailedReduces() != null) {
+ values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+ }
values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name());
handleJob(wrapper.getType(), values, js.getTotalCounters());
}
private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
- JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion)wrapper.getEvent());
+ JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
- if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
- if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+ if (js.getJobid() != null) {
+ values.put(Keys.JOBID, js.getJobid().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getFinishedMaps() != null) {
+ values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+ }
+ if (js.getFinishedReduces() != null) {
+ values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+ }
+ if (js.getJobStatus() != null) {
+ values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+ }
handleJob(wrapper.getType(), values, null);
}
private void handleTaskStarted(Event wrapper) throws Exception {
- TaskStarted js = ((TaskStarted)wrapper.getEvent());
+ TaskStarted js = ((TaskStarted) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
- if (js.getSplitLocations() != null) values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getStartTime() != null) {
+ values.put(Keys.START_TIME, js.getStartTime().toString());
+ }
+ if (js.getSplitLocations() != null) {
+ values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+ }
handleTask(RecordTypes.Task, wrapper.getType(), values, null);
}
private void handleTaskFinished(Event wrapper) throws Exception {
- TaskFinished js = ((TaskFinished)wrapper.getEvent());
+ TaskFinished js = ((TaskFinished) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getStatus() != null) {
+ values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+ }
handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
}
private void handleTaskFailed(Event wrapper) throws Exception {
- TaskFailed js = ((TaskFailed)wrapper.getEvent());
+ TaskFailed js = ((TaskFailed) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
- if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
- if (js.getFailedDueToAttempt() != null) values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getStatus() != null) {
+ values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+ }
+ if (js.getError() != null) {
+ values.put(Keys.ERROR, js.getError().toString());
+ }
+ if (js.getFailedDueToAttempt() != null) {
+ values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+ }
handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
}
private String normalizeTaskStatus(String taskStatus) {
- if (taskStatus.equals("SUCCEEDED"))
+ if (taskStatus.equals("SUCCEEDED")) {
return EagleTaskStatus.SUCCESS.name();
+ }
return taskStatus;
}
- private void handleTaskUpdated(){
+ private void handleTaskUpdated() {
return;
}
private void handleMapAttemptStarted(Event wrapper) throws Exception {
handleTaskAttemptStarted(wrapper, RecordTypes.MapAttempt);
}
+
private void handleReduceAttemptStarted(Event wrapper) throws Exception {
handleTaskAttemptStarted(wrapper, RecordTypes.ReduceAttempt);
}
private void handleTaskAttemptStarted(Event wrapper, RecordTypes recordType) throws Exception {
- TaskAttemptStarted js = ((TaskAttemptStarted)wrapper.getEvent());
+ TaskAttemptStarted js = ((TaskAttemptStarted) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
- if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
- if (js.getTrackerName() != null) values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
- if (js.getHttpPort() != null) values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
- if (js.getShufflePort() != null) values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
- if (js.getLocality() != null) values.put(Keys.LOCALITY, js.getLocality().toString());
- if (js.getAvataar() != null) values.put(Keys.AVATAAR, js.getAvataar().toString());
- handleTask(recordType,wrapper.getType(), values, null);
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getAttemptId() != null) {
+ values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+ }
+ if (js.getStartTime() != null) {
+ values.put(Keys.START_TIME, js.getStartTime().toString());
+ }
+ if (js.getTrackerName() != null) {
+ values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
+ }
+ if (js.getHttpPort() != null) {
+ values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
+ }
+ if (js.getShufflePort() != null) {
+ values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
+ }
+ if (js.getLocality() != null) {
+ values.put(Keys.LOCALITY, js.getLocality().toString());
+ }
+ if (js.getAvataar() != null) {
+ values.put(Keys.AVATAAR, js.getAvataar().toString());
+ }
+ handleTask(recordType, wrapper.getType(), values, null);
}
private void handleMapAttemptFinished(Event wrapper) throws Exception {
- MapAttemptFinished js = ((MapAttemptFinished)wrapper.getEvent());
+ MapAttemptFinished js = ((MapAttemptFinished) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
- if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getMapFinishTime() != null) values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
- if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
- if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
- if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
- if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getTaskStatus() != null) {
+ values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+ }
+ if (js.getAttemptId() != null) {
+ values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getMapFinishTime() != null) {
+ values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
+ }
+ if (js.getHostname() != null) {
+ values.put(Keys.HOSTNAME, js.getHostname().toString());
+ }
+ if (js.getPort() != null) {
+ values.put(Keys.PORT, js.getPort().toString());
+ }
+ if (js.getRackname() != null) {
+ values.put(Keys.RACK_NAME, js.getRackname().toString());
+ }
+ if (js.getState() != null) {
+ values.put(Keys.STATE_STRING, js.getState().toString());
+ }
ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
- handleTask(RecordTypes.MapAttempt,wrapper.getType(), values, js.getCounters());
+ handleTask(RecordTypes.MapAttempt, wrapper.getType(), values, js.getCounters());
}
+
private void handleReduceAttemptFinished(Event wrapper) throws Exception {
- ReduceAttemptFinished js = ((ReduceAttemptFinished)wrapper.getEvent());
+ ReduceAttemptFinished js = ((ReduceAttemptFinished) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
- if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getShuffleFinishTime() != null) values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
- if (js.getSortFinishTime() != null) values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
- if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
- if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
- if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
- if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getTaskStatus() != null) {
+ values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+ }
+ if (js.getAttemptId() != null) {
+ values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getShuffleFinishTime() != null) {
+ values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
+ }
+ if (js.getSortFinishTime() != null) {
+ values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
+ }
+ if (js.getHostname() != null) {
+ values.put(Keys.HOSTNAME, js.getHostname().toString());
+ }
+ if (js.getPort() != null) {
+ values.put(Keys.PORT, js.getPort().toString());
+ }
+ if (js.getRackname() != null) {
+ values.put(Keys.RACK_NAME, js.getRackname().toString());
+ }
+ if (js.getState() != null) {
+ values.put(Keys.STATE_STRING, js.getState().toString());
+ }
ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
- handleTask(RecordTypes.ReduceAttempt,wrapper.getType(), values, js.getCounters());
+ handleTask(RecordTypes.ReduceAttempt, wrapper.getType(), values, js.getCounters());
}
private void ensureRackAfterAttemptFinish(String rackname, Map<Keys, String> values) {
@@ -296,61 +461,89 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
private void handleMapAttemptFailed(Event wrapper) throws Exception {
handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
}
+
private void handleReduceAttemptFailed(Event wrapper) throws Exception {
handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
}
- private void handleMapAttemptKilled(Event wrapper)throws Exception {
+
+ private void handleMapAttemptKilled(Event wrapper) throws Exception {
handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
}
- private void handleReduceAttemptKilled(Event wrapper)throws Exception {
+
+ private void handleReduceAttemptKilled(Event wrapper) throws Exception {
handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
}
-
+
private void handleTaskAttemptFailed(Event wrapper, RecordTypes recordType) throws Exception {
- TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion)wrapper.getEvent());
+ TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion) wrapper.getEvent());
Map<Keys, String> values = new HashMap<>();
- if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
- if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
- if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
- if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
- if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
- if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
- if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
- if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
- if (js.getStatus() != null) values.put(Keys.TASK_STATUS, js.getStatus().toString());
+ if (js.getTaskid() != null) {
+ values.put(Keys.TASKID, js.getTaskid().toString());
+ }
+ if (js.getTaskType() != null) {
+ values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+ }
+ if (js.getAttemptId() != null) {
+ values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+ }
+ if (js.getFinishTime() != null) {
+ values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+ }
+ if (js.getHostname() != null) {
+ values.put(Keys.HOSTNAME, js.getHostname().toString());
+ }
+ if (js.getPort() != null) {
+ values.put(Keys.PORT, js.getPort().toString());
+ }
+ if (js.getRackname() != null) {
+ values.put(Keys.RACK_NAME, js.getRackname().toString());
+ }
+ if (js.getError() != null) {
+ values.put(Keys.ERROR, js.getError().toString());
+ }
+ if (js.getStatus() != null) {
+ values.put(Keys.TASK_STATUS, js.getStatus().toString());
+ }
ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
- handleTask(recordType,wrapper.getType(), values, js.getCounters());
- }
- private void handleSetupAttemptFinished(){
+ handleTask(recordType, wrapper.getType(), values, js.getCounters());
+ }
+
+ private void handleSetupAttemptFinished() {
return;
}
- private void handleSetupAttemptFailed(){
+
+ private void handleSetupAttemptFailed() {
return;
}
- private void handleSetupAttemptKilled(){
+
+ private void handleSetupAttemptKilled() {
return;
}
- private void handleCleanupAttemptFinished(){
+
+ private void handleCleanupAttemptFinished() {
return;
}
- private void handleCleanupAttemptFailed(){
+
+ private void handleCleanupAttemptFailed() {
return;
}
- private void handleCleanupAttemptKilled(){
+
+ private void handleCleanupAttemptKilled() {
return;
}
- private void handleAMStarted(){
+
+ private void handleAMStarted() {
return;
}
- protected JobCounters parseCounters(Object value) throws IOException{
+ protected JobCounters parseCounters(Object value) throws IOException {
JobCounters jc = new JobCounters();
Map<String, Map<String, Long>> groups = new HashMap<>();
- JhCounters counters = (JhCounters)value;
+ JhCounters counters = (JhCounters) value;
List<JhCounterGroup> list = counters.getGroups();
for (JhCounterGroup cg : list) {
String cgName = cg.getName().toString();
- if(!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
+ if (!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
&& !cgName.equals("org.apache.hadoop.mapreduce.TaskCounter")
&& !cgName.equals("org.apache.hadoop.mapreduce.JobCounter")
&& !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter")
@@ -359,7 +552,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
&& !cgName.equals("org.apache.hadoop.mapred.Task$Counter") // for artemis
&& !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis
&& !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") // for artemis
- ) continue;
+ ) {
+ continue;
+ }
groups.put(cgName, new HashMap<String, Long>());
Map<String, Long> counterValues = groups.get(cgName);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
index c289e4d..2ccbf8d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
@@ -19,8 +19,9 @@
package org.apache.eagle.jpm.mr.history.parser;
import org.apache.avro.Schema;
-import org.apache.avro.io.*;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.mapreduce.jobhistory.Event;
import org.slf4j.Logger;
@@ -33,35 +34,35 @@ import java.io.InputStream;
public class JHFMRVer2Parser implements JHFParserBase {
private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2Parser.class);
private JHFMRVer2EventReader _reader;
-
- public JHFMRVer2Parser(JHFMRVer2EventReader reader){
+
+ public JHFMRVer2Parser(JHFMRVer2EventReader reader) {
this._reader = reader;
}
- @SuppressWarnings({ "rawtypes", "deprecation" })
+ @SuppressWarnings( {"rawtypes", "deprecation"})
@Override
public void parse(InputStream is) throws Exception {
int eventCtr = 0;
try {
- long start = System.currentTimeMillis();
- DataInputStream in = new DataInputStream(is);
- String version = in.readLine();
- if (!"Avro-Json".equals(version)) {
- throw new IOException("Incompatible event log version: " + version);
- }
-
- Schema schema = Schema.parse(in.readLine());
- SpecificDatumReader datumReader = new SpecificDatumReader(schema);
- JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
+ long start = System.currentTimeMillis();
+ DataInputStream in = new DataInputStream(is);
+ String version = in.readLine();
+ if (!"Avro-Json".equals(version)) {
+ throw new IOException("Incompatible event log version: " + version);
+ }
+
+ Schema schema = Schema.parse(in.readLine());
+ SpecificDatumReader datumReader = new SpecificDatumReader(schema);
+ JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
- Event wrapper;
- while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
- ++eventCtr;
- _reader.handleEvent(wrapper);
- }
- _reader.parseConfiguration();
- // don't need put to finally as it's a kind of flushing data
- _reader.close();
+ Event wrapper;
+ while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
+ ++eventCtr;
+ _reader.handleEvent(wrapper);
+ }
+ _reader.parseConfiguration();
+ // don't need put to finally as it's a kind of flushing data
+ _reader.close();
logger.info("reader used " + (System.currentTimeMillis() - start) + "ms");
} catch (Exception ioe) {
logger.error("Caught exception parsing history file after " + eventCtr + " events", ioe);
@@ -71,13 +72,13 @@ public class JHFMRVer2Parser implements JHFParserBase {
is.close();
}
}
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ }
+
+ @SuppressWarnings( {"rawtypes", "unchecked"})
public Event getNextEvent(DatumReader datumReader, JsonDecoder decoder) throws Exception {
Event wrapper;
try {
- wrapper = (Event)datumReader.read(null, decoder);
+ wrapper = (Event) datumReader.read(null, decoder);
} catch (java.io.EOFException e) { // at EOF
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
index e4f437f..a2479ab 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
@@ -21,13 +21,12 @@ package org.apache.eagle.jpm.mr.history.parser;
import java.io.InputStream;
/**
- *
* @author yonzhang
- *
*/
public interface JHFParserBase {
/**
* this method will ensure to close the inputstream
+ *
* @param is
* @throws Exception
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index dd640c2..718612d 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
@@ -30,7 +30,7 @@ public class JHFParserFactory {
private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
- public static JHFParserBase getParser(JHFConfigManager configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+ public static JHFParserBase getParser(MRHistoryJobConfig configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
String format = configManager.getJobExtractorConfig().mrVersion;
JHFParserBase parser;
JHFFormat f;
@@ -40,31 +40,31 @@ public class JHFParserFactory {
} else {
f = JHFFormat.valueOf(format);
}
- } catch(IllegalArgumentException ex) {
+ } catch (IllegalArgumentException ex) {
f = JHFFormat.MRVer1; // fall back to version 1 unless it's specified as version 2
}
-
+
switch (f) {
- case MRVer2:
- JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
- reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
- reader2.addListener(new TaskFailureListener(configManager));
- reader2.addListener(new TaskAttemptCounterListener(configManager));
- reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
+ case MRVer2:
+ JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+ reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
+ reader2.addListener(new TaskFailureListener(configManager));
+ reader2.addListener(new TaskAttemptCounterListener(configManager));
+ reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
- reader2.register(new JobEntityLifecycleAggregator());
- parser = new JHFMRVer2Parser(reader2);
- break;
- case MRVer1:
- default:
- JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
- reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
- reader1.addListener(new TaskFailureListener(configManager));
- reader1.addListener(new TaskAttemptCounterListener(configManager));
+ reader2.register(new JobEntityLifecycleAggregator());
+ parser = new JHFMRVer2Parser(reader2);
+ break;
+ case MRVer1:
+ default:
+ JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+ reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
+ reader1.addListener(new TaskFailureListener(configManager));
+ reader1.addListener(new TaskAttemptCounterListener(configManager));
- reader1.register(new JobEntityLifecycleAggregator());
- parser = new JHFMRVer1Parser(reader1);
- break;
+ reader1.register(new JobEntityLifecycleAggregator());
+ parser = new JHFMRVer1Parser(reader1);
+ break;
}
return parser;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
index 4529f8b..d5ed97a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
@@ -21,16 +21,16 @@ package org.apache.eagle.jpm.mr.history.parser;
/**
* used to warn that one job history file has not yet completed writing to hdfs
* This happens when feeder catches up and the history file has not been written into hdfs completely
- * @author yonzhang
*
+ * @author yonzhang
*/
public class JHFWriteNotCompletedException extends Exception {
/**
- *
+ *
*/
private static final long serialVersionUID = -3060175780718218490L;
- public JHFWriteNotCompletedException(String msg){
+ public JHFWriteNotCompletedException(String msg) {
super(msg);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 0334c9b..4163f7b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.JobConfigurationAPIEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
@@ -32,10 +32,10 @@ import java.util.List;
public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener {
private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
private static final int MAX_RETRY_TIMES = 3;
- private JHFConfigManager configManager;
+ private MRHistoryJobConfig configManager;
private JobConfigurationAPIEntity m_jobConfigurationEntity;
- public JobConfigurationCreationServiceListener(JHFConfigManager configManager) {
+ public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) {
this.configManager = configManager;
}
@@ -43,7 +43,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
if (entity != null) {
if (entity instanceof JobConfigurationAPIEntity) {
- this.m_jobConfigurationEntity = (JobConfigurationAPIEntity)entity;
+ this.m_jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
}
}
}
@@ -55,13 +55,13 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
@Override
public void flush() throws Exception {
- JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
- JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
List<JobConfigurationAPIEntity> list = new ArrayList<>();
@@ -76,7 +76,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
break;
} catch (Exception ex) {
if (tried < MAX_RETRY_TIMES) {
- logger.error("Got exception to flush, retry as " + (tried + 1) +" times",ex);
+ logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
} else {
logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
}
@@ -86,7 +86,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
client.getJerseyClient().destroy();
client.close();
}
- tried ++;
+ tried++;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index e1a3c69..abddf3b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.historyentity.*;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
@@ -34,23 +34,24 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
private static final int BATCH_SIZE = 1000;
private int batchSize;
private List<JobBaseAPIEntity> list = new ArrayList<>();
- private JHFConfigManager configManager;
+ private MRHistoryJobConfig configManager;
List<JobExecutionAPIEntity> jobs = new ArrayList<>();
List<JobEventAPIEntity> jobEvents = new ArrayList<>();
List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
-
- public JobEntityCreationEagleServiceListener(JHFConfigManager configManager){
+
+ public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
this(configManager, BATCH_SIZE);
}
-
- public JobEntityCreationEagleServiceListener(JHFConfigManager configManager, int batchSize) {
+
+ public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager, int batchSize) {
this.configManager = configManager;
- if (batchSize <= 0)
+ if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
+ }
this.batchSize = batchSize;
}
-
+
@Override
public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
list.add(entity);
@@ -65,26 +66,26 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
*/
@Override
public void flush() throws Exception {
- JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
- JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
logger.info("start flushing entities of total number " + list.size());
for (int i = 0; i < list.size(); i++) {
JobBaseAPIEntity entity = list.get(i);
if (entity instanceof JobExecutionAPIEntity) {
- jobs.add((JobExecutionAPIEntity)entity);
- } else if(entity instanceof JobEventAPIEntity) {
- jobEvents.add((JobEventAPIEntity)entity);
- } else if(entity instanceof TaskExecutionAPIEntity) {
- taskExecs.add((TaskExecutionAPIEntity)entity);
- } else if(entity instanceof TaskAttemptExecutionAPIEntity) {
- taskAttemptExecs.add((TaskAttemptExecutionAPIEntity)entity);
+ jobs.add((JobExecutionAPIEntity) entity);
+ } else if (entity instanceof JobEventAPIEntity) {
+ jobEvents.add((JobEventAPIEntity) entity);
+ } else if (entity instanceof TaskExecutionAPIEntity) {
+ taskExecs.add((TaskExecutionAPIEntity) entity);
+ } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
+ taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
}
}
GenericServiceAPIResponseEntity result;
@@ -117,7 +118,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
client.getJerseyClient().destroy();
client.close();
}
-
+
private void checkResult(GenericServiceAPIResponseEntity result) throws Exception {
if (!result.isSuccess()) {
logger.error(result.getException());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index 6e0371c..8b85b26 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -24,21 +24,22 @@ import java.util.Vector;
/**
* not thread safe
- * @author yonzhang
*
+ * @author yonzhang
*/
public class JobEntityCreationPublisher {
private Vector<HistoryJobEntityCreationListener> listeners = new Vector<>(2);
- public void addListener(HistoryJobEntityCreationListener l){
+
+ public void addListener(HistoryJobEntityCreationListener l) {
listeners.add(l);
}
-
+
public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
for (HistoryJobEntityCreationListener l : listeners) {
l.jobEntityCreated(entity);
}
}
-
+
public void flush() throws Exception {
for (HistoryJobEntityCreationListener l : listeners) {
l.flush();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index d97e2a3..594d8e2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -53,10 +53,10 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
@Override
public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
- if (entity != null ) {
+ if (entity != null) {
if (entity instanceof TaskAttemptExecutionAPIEntity) {
taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity);
- } else if(entity instanceof JobExecutionAPIEntity) {
+ } else if (entity instanceof JobExecutionAPIEntity) {
this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
}
}
@@ -74,24 +74,28 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters();
if (jobCounters == null) {
- LOG.warn("no job counter found for "+this.m_jobExecutionAPIEntity);
+ LOG.warn("no job counter found for " + this.m_jobExecutionAPIEntity);
jobCounters = new JobCounters();
}
- Map<String,Map<String,Long>> counters = jobCounters.getCounters();
+ Map<String, Map<String, Long>> counters = jobCounters.getCounters();
- Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
- if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>();
+ Map<String, Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
+ if (mapTaskAttemptCounter == null) {
+ mapTaskAttemptCounter = new HashMap<>();
+ }
mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
- counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
+ counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER, mapTaskAttemptCounter);
- Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
- if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>();
+ Map<String, Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
+ if (reduceTaskAttemptCounter == null) {
+ reduceTaskAttemptCounter = new HashMap<>();
+ }
reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
- counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
+ counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER, reduceTaskAttemptCounter);
counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
- counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
+ counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_reduceFileSystemTaskCounterAgg.result());
jobCounters.setCounters(counters);
@@ -122,9 +126,9 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
ObjectMapper objectMapper = new ObjectMapper();
try {
- LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity));
+ LOG.warn("Unknown task type of task attempt execution entity: " + objectMapper.writeValueAsString(entity));
} catch (Exception e) {
- LOG.error(e.getMessage(),e);
+ LOG.error(e.getMessage(), e);
}
}
@@ -137,13 +141,14 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
interface JobCounterAggregateFunction {
void accumulate(Map<String, Long> mapCounters);
- Map<String,Long> result();
+
+ Map<String, Long> result();
}
static class JobCounterSumFunction implements JobCounterAggregateFunction {
final Map<String, Long> result;
- public JobCounterSumFunction(){
+ public JobCounterSumFunction() {
result = new HashMap<>();
}
@@ -153,7 +158,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
@Override
public void accumulate(Map<String, Long> counters) {
if (counters != null) {
- for (Map.Entry<String,Long> taskEntry: counters.entrySet()) {
+ for (Map.Entry<String, Long> taskEntry : counters.entrySet()) {
String counterName = taskEntry.getKey();
long counterValue = taskEntry.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
index a2f4c3a..e65ec17 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
@@ -28,13 +28,13 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
public final class MRErrorClassifier {
-
+
private final List<ErrorCategory> categories = new ArrayList<ErrorCategory>();
public MRErrorClassifier(InputStream configStream) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(configStream));
String line;
- while((line = reader.readLine()) != null) {
+ while ((line = reader.readLine()) != null) {
line = line.trim();
if (line.isEmpty() || line.startsWith("#")) {
continue;
@@ -58,7 +58,7 @@ public final class MRErrorClassifier {
categories.add(category);
}
}
-
+
public List<ErrorCategory> getErrorCategories() {
return categories;
}
@@ -72,31 +72,36 @@ public final class MRErrorClassifier {
}
return "UNKNOWN";
}
-
+
public static class ErrorCategory {
private String name;
private Pattern pattern;
private boolean needTransform;
-
+
public String getName() {
return name;
}
+
public void setName(String name) {
this.name = name;
}
+
public Pattern getPattern() {
return pattern;
}
+
public void setPattern(Pattern pattern) {
this.pattern = pattern;
}
+
public boolean isNeedTransform() {
return needTransform;
}
+
public void setNeedTransform(boolean needTransform) {
this.needTransform = needTransform;
}
-
+
public String classify(String error) {
Matcher matcher = pattern.matcher(error);
if (matcher.find()) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
index d2ac944..be23051 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
@@ -17,9 +17,10 @@
*/
package org.apache.eagle.jpm.mr.history.parser;
+
/**
- * Record types are identifiers for each line of log in history files.
- * A record type appears as the first token in a single line of log.
+ * Record types are identifiers for each line of log in history files.
+ * A record type appears as the first token in a single line of log.
*/
public enum RecordTypes {
Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index d819baf..efc43c5 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
@@ -34,46 +34,48 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class);
private static final int BATCH_SIZE = 1000;
private Map<CounterKey, CounterValue> counters = new HashMap<>();
- private JHFConfigManager configManager;
+ private MRHistoryJobConfig configManager;
- public TaskAttemptCounterListener(JHFConfigManager configManager) {
+ public TaskAttemptCounterListener(MRHistoryJobConfig configManager) {
this.configManager = configManager;
}
private static class CounterKey {
Map<String, String> tags = new HashMap<>();
long timestamp;
-
+
@Override
public boolean equals(Object thatKey) {
- if (!(thatKey instanceof CounterKey))
+ if (!(thatKey instanceof CounterKey)) {
return false;
- CounterKey that = (CounterKey)thatKey;
- if (that.tags.equals(this.tags) && that.timestamp == this.timestamp)
+ }
+ CounterKey that = (CounterKey) thatKey;
+ if (that.tags.equals(this.tags) && that.timestamp == this.timestamp) {
return true;
+ }
return false;
}
-
+
@Override
- public int hashCode(){
+ public int hashCode() {
return tags.hashCode() ^ Long.valueOf(timestamp).hashCode();
}
}
-
+
private static class CounterValue {
int totalCount;
int failedCount;
int killedCount;
}
-
+
@Override
public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
return;
}
-
- TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
-
+
+ TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity) entity;
+
Map<String, String> tags = new HashMap<>();
tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
@@ -85,21 +87,21 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
CounterKey key = new CounterKey();
key.tags = tags;
key.timestamp = roundToMinute(e.getEndTime());
-
+
CounterValue value = counters.get(key);
if (value == null) {
value = new CounterValue();
counters.put(key, value);
}
-
+
if (e.getTaskStatus().equals(EagleTaskStatus.FAILED.name())) {
value.failedCount++;
- } else if(e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+ } else if (e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
value.killedCount++;
}
value.totalCount++;
}
-
+
private long roundToMinute(long timestamp) {
GregorianCalendar cal = new GregorianCalendar();
cal.setTimeInMillis(timestamp);
@@ -107,16 +109,16 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
cal.set(Calendar.MILLISECOND, 0);
return cal.getTimeInMillis();
}
-
+
@Override
public void flush() throws Exception {
- JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
- JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
@@ -132,7 +134,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
entity.setFailedCount(value.failedCount);
entity.setKilledCount(value.killedCount);
list.add(entity);
-
+
if (list.size() >= BATCH_SIZE) {
logger.info("start flushing TaskAttemptCounter " + list.size());
client.create(list);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 6e42fe2..e0c3c6b 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskFailureCountAPIEntity;
@@ -44,94 +44,98 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>();
private final MRErrorClassifier classifier;
- private JHFConfigManager configManager;
+ private MRHistoryJobConfig configManager;
- public TaskFailureListener(JHFConfigManager configManager) {
+ public TaskFailureListener(MRHistoryJobConfig configManager) {
this.configManager = configManager;
- InputStream is = null;
- try {
- is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
+ InputStream is = null;
+ try {
+ is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
if (url != null) {
logger.info("Feeder is going to load configuration file: " + url.toString());
}
- classifier = new MRErrorClassifier(is);
- } catch (IOException ex) {
- throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
- } finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- }
- }
- }
+ classifier = new MRErrorClassifier(is);
+ } catch (IOException ex) {
+ throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ }
+ }
}
@Override
public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
- if (!(entity instanceof TaskAttemptExecutionAPIEntity))
- return;
+ if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
+ return;
+ }
- TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
- // only store those killed or failed tasks
- if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name()))
- return;
+ TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity) entity;
+ // only store those killed or failed tasks
+ if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+ return;
+ }
- TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
- Map<String, String> tags = new HashMap<>();
- failureTask.setTags(tags);
- tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
- tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
- tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
- tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
- tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
- tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
- tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
+ TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
+ Map<String, String> tags = new HashMap<>();
+ failureTask.setTags(tags);
+ tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+ tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+ tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+ tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+ tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+ tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+ tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
- //TODO need optimize, match and then capture the data
- final String errCategory = classifier.classifyError(e.getError());
- tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
+ //TODO need optimize, match and then capture the data
+ final String errCategory = classifier.classifyError(e.getError());
+ tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
- failureTask.setError(e.getError());
- failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
- failureTask.setTimestamp(e.getTimestamp());
- failureTask.setTaskStatus(e.getTaskStatus());
- failureTasks.add(failureTask);
+ failureTask.setError(e.getError());
+ failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
+ failureTask.setTimestamp(e.getTimestamp());
+ failureTask.setTaskStatus(e.getTaskStatus());
+ failureTasks.add(failureTask);
- if (failureTasks.size() >= BATCH_SIZE) flush();
+ if (failureTasks.size() >= BATCH_SIZE) {
+ flush();
+ }
}
-
+
@Override
public void flush() throws Exception {
- JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
- JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
- IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
+ MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+ MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+ client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
- int tried = 0;
- while (tried <= MAX_RETRY_TIMES) {
- try {
- logger.info("start flushing entities of total number " + failureTasks.size());
- client.create(failureTasks);
- logger.info("finish flushing entities of total number " + failureTasks.size());
- failureTasks.clear();
- break;
- } catch (Exception ex) {
- if (tried < MAX_RETRY_TIMES) {
- logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
- } else {
- logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
- throw ex;
- }
- }
- tried ++;
- }
+ int tried = 0;
+ while (tried <= MAX_RETRY_TIMES) {
+ try {
+ logger.info("start flushing entities of total number " + failureTasks.size());
+ client.create(failureTasks);
+ logger.info("finish flushing entities of total number " + failureTasks.size());
+ failureTasks.clear();
+ break;
+ } catch (Exception ex) {
+ if (tried < MAX_RETRY_TIMES) {
+ logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+ } else {
+ logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+ throw ex;
+ }
+ }
+ tried++;
+ }
client.getJerseyClient().destroy();
client.close();
}